draw routes

This commit is contained in:
Evan Scamehorn
2025-11-26 21:26:41 -06:00
parent 059a58fbf7
commit 45b8811767
4 changed files with 485 additions and 332 deletions

View File

@@ -1,6 +1,10 @@
import osmnx as ox
import json
import os
import networkx as nx
from shapely.ops import unary_union
from shapely.geometry import Polygon, MultiPolygon
import re
# ==========================================
# 1. Configuration
@@ -8,48 +12,31 @@ import os
PLACE_NAME = "Wisconsin State Capitol, Madison, USA"
DIST = 3000
# ==========================================
# 2. Data Fetching
# ==========================================
print(f"Downloading data for {PLACE_NAME}...")
# Define tags for different layers
tags = {
"building": True,
"natural": ["water", "bay", "coastline"],
"landuse": ["grass", "forest", "park", "recreation_ground"],
"leisure": ["park", "garden"],
"highway": True,
# Road width settings (meters)
LANE_WIDTH_DEFAULT = 3.5
DEFAULT_WIDTHS = {
"motorway": 12,
"trunk": 11,
"primary": 10,
"secondary": 9,
"tertiary": 8,
"residential": 6,
"service": 4,
"unclassified": 5,
"cycleway": 2,
"footway": 1.5,
"path": 1.5,
}
try:
# OSMNX v2.0+
gdf = ox.features.features_from_address(PLACE_NAME, tags=tags, dist=DIST)
except AttributeError:
# OSMNX < v2.0
gdf = ox.features_from_address(PLACE_NAME, tags=tags, dist=DIST)
# ==========================================
# 3. Projection & Normalization
# 2. Helpers
# ==========================================
print("Projecting to local grid...")
gdf_proj = gdf.to_crs(gdf.estimate_utm_crs())
# Calculate center for (0,0,0) normalization
center_x = gdf_proj.geometry.centroid.x.mean()
center_y = gdf_proj.geometry.centroid.y.mean()
# ==========================================
# 4. Processing Functions
# ==========================================
def get_height(row):
"""Estimates building height from tags."""
h = 10.0 # Default
"""Estimates building height."""
h = 8.0
if "height" in row and str(row["height"]).lower() != "nan":
try:
# Extract numeric part
clean = "".join(
filter(lambda x: x.isdigit() or x == ".", str(row["height"]))
)
@@ -67,81 +54,173 @@ def get_height(row):
return round(h, 1)
def parse_polygon(geom):
"""Extracts exterior coordinates from a Polygon."""
if geom.is_empty:
return []
coords = list(geom.exterior.coords)
# Simplify slightly to reduce vertex count if needed, or keep raw
return [[round(x - center_x, 1), round(y - center_y, 1)] for x, y in coords]
def estimate_road_width(row):
"""Estimates width with US-unit safety checks."""
# 1. Explicit width tag
for key in ["width", "width:carriageway", "est_width"]:
if key in row and str(row[key]) != "nan":
val_str = str(row[key]).lower()
try:
nums = re.findall(r"[-+]?\d*\.\d+|\d+", val_str)
if nums:
val = float(nums[0])
if "'" in val_str or "ft" in val_str or "feet" in val_str:
val *= 0.3048
elif val > 50: # Sanity check for feet without units
val *= 0.3048
return val
except:
pass
# 2. Lanes
if "lanes" in row and str(row["lanes"]) != "nan":
try:
clean = re.findall(r"\d+", str(row["lanes"]))
if clean:
lanes = int(clean[0])
lanes = max(1, min(lanes, 6))
return lanes * LANE_WIDTH_DEFAULT
except:
pass
# 3. Default based on type
highway = row.get("highway", "residential")
if isinstance(highway, list):
highway = highway[0]
return DEFAULT_WIDTHS.get(highway, 4.0)
def parse_linestring(geom):
"""Extracts coordinates from a LineString."""
def parse_geometry(geom, center_x, center_y):
"""Parses geometry into {outer, holes} structure."""
if geom.is_empty:
return []
coords = list(geom.coords)
return [[round(x - center_x, 1), round(y - center_y, 1)] for x, y in coords]
polys = []
if geom.geom_type == "Polygon":
source_geoms = [geom]
elif geom.geom_type == "MultiPolygon":
source_geoms = geom.geoms
else:
return []
for poly in source_geoms:
outer = [
[round(x - center_x, 2), round(y - center_y, 2)]
for x, y in poly.exterior.coords
]
holes = []
for interior in poly.interiors:
hole_coords = [
[round(x - center_x, 2), round(y - center_y, 2)]
for x, y in interior.coords
]
holes.append(hole_coords)
polys.append({"outer": outer, "holes": holes})
return polys
def parse_line_points(geom, center_x, center_y):
"""Simple parser for LineStrings (Routing Graph)."""
if geom.geom_type == "LineString":
return [
[round(x - center_x, 2), round(y - center_y, 2)] for x, y in geom.coords
]
return []
# ==========================================
# 5. Categorization Loop
# 3. Execution
# ==========================================
output_data = {"buildings": [], "water": [], "parks": [], "roads": []}
print(f"1. Downloading Data for: {PLACE_NAME}...")
print("Processing geometries...")
tags_visual = {
"building": True,
"natural": ["water", "bay"],
"leisure": ["park", "garden"],
"landuse": ["grass", "forest", "park"],
}
gdf_visual = ox.features.features_from_address(PLACE_NAME, tags=tags_visual, dist=DIST)
for idx, row in gdf_proj.iterrows():
geom = row.geometry
print(" Downloading Road Graph...")
G = ox.graph.graph_from_address(PLACE_NAME, dist=DIST, network_type="drive")
gdf_nodes, gdf_edges = ox.graph_to_gdfs(G)
# Handle MultiPolygons by iterating over them
geoms = [geom] if geom.geom_type in ["Polygon", "LineString"] else []
if geom.geom_type == "MultiPolygon":
geoms = list(geom.geoms)
elif geom.geom_type == "MultiLineString":
geoms = list(geom.geoms)
print("2. Projecting Coordinates...")
utm_crs = gdf_visual.estimate_utm_crs()
gdf_visual = gdf_visual.to_crs(utm_crs)
gdf_edges = gdf_edges.to_crs(utm_crs)
gdf_nodes = gdf_nodes.to_crs(utm_crs)
for sub_geom in geoms:
# 1. BUILDINGS
center_x = gdf_visual.geometry.centroid.x.mean()
center_y = gdf_visual.geometry.centroid.y.mean()
output_visual = {"buildings": [], "water": [], "parks": [], "roads": []}
output_routing = {"nodes": {}, "edges": []}
print("3. Processing Visual Layers...")
for idx, row in gdf_visual.iterrows():
polygons = parse_geometry(row.geometry, center_x, center_y)
for poly_data in polygons:
# 1. Buildings
if "building" in row and str(row["building"]) != "nan":
if sub_geom.geom_type == "Polygon":
output_data["buildings"].append(
{"shape": parse_polygon(sub_geom), "height": get_height(row)}
)
output_visual["buildings"].append(
{"shape": poly_data, "height": get_height(row)}
)
# 2. WATER
elif ("natural" in row and row["natural"] in tags["natural"]) or (
# 2. Water (Explicit check for NaN)
elif ("natural" in row and str(row["natural"]) != "nan") or (
"water" in row and str(row["water"]) != "nan"
):
if sub_geom.geom_type == "Polygon":
output_data["water"].append({"shape": parse_polygon(sub_geom)})
output_visual["water"].append({"shape": poly_data})
# 3. PARKS / GREENSPACE
elif ("leisure" in row and row["leisure"] in tags["leisure"]) or (
"landuse" in row and row["landuse"] in tags["landuse"]
):
if sub_geom.geom_type == "Polygon":
output_data["parks"].append({"shape": parse_polygon(sub_geom)})
# 3. Parks (Fallback)
else:
output_visual["parks"].append({"shape": poly_data})
# 4. ROADS
elif "highway" in row and str(row["highway"]) != "nan":
if sub_geom.geom_type == "LineString":
output_data["roads"].append({"path": parse_linestring(sub_geom)})
print(" Buffering roads...")
road_polys = []
for idx, row in gdf_edges.iterrows():
width = estimate_road_width(row)
buffered = row.geometry.buffer(width / 2, cap_style=2, join_style=2)
road_polys.append(buffered)
# ==========================================
# 6. Save File
# ==========================================
output_path = os.path.join(os.path.dirname(__file__), "../public/city_data.json")
os.makedirs(os.path.dirname(output_path), exist_ok=True)
if road_polys:
print(" Merging road polygons...")
merged_roads = unary_union(road_polys)
road_shapes = parse_geometry(merged_roads, center_x, center_y)
for shape in road_shapes:
output_visual["roads"].append({"shape": shape})
with open(output_path, "w") as f:
json.dump(output_data, f)
print("4. Processing Routing Graph...")
for node_id, row in gdf_nodes.iterrows():
output_routing["nodes"][int(node_id)] = {
"x": round(row.geometry.x - center_x, 2),
"y": round(row.geometry.y - center_y, 2),
}
print(
f"Exported:"
f"\n Buildings: {len(output_data['buildings'])}"
f"\n Roads: {len(output_data['roads'])}"
f"\n Water: {len(output_data['water'])}"
f"\n Parks: {len(output_data['parks'])}"
)
print(f"Saved to {output_path}")
for u, v, k in G.edges(keys=True):
try:
row = gdf_edges.loc[(u, v, k)]
if isinstance(row, (type(gdf_edges),)):
row = row.iloc[0]
except KeyError:
continue
output_routing["edges"].append(
{
"u": int(u),
"v": int(v),
"oneway": bool(row.get("oneway", False)),
"points": parse_line_points(row.geometry, center_x, center_y),
}
)
out_dir = os.path.join(os.path.dirname(__file__), "../public")
os.makedirs(out_dir, exist_ok=True)
with open(os.path.join(out_dir, "city_data.json"), "w") as f:
json.dump(output_visual, f)
with open(os.path.join(out_dir, "routing_graph.json"), "w") as f:
json.dump(output_routing, f)
print(f"Done! Exported to {out_dir}")