363 lines
10 KiB
Python
363 lines
10 KiB
Python
import osmnx as ox
|
|
import json
|
|
import os
|
|
import networkx as nx
|
|
from shapely.ops import unary_union
|
|
from scipy.spatial import cKDTree
|
|
import re
|
|
import numpy as np
|
|
import pandas as pd
|
|
|
|
# ==========================================
|
|
# 1. Configuration
|
|
# ==========================================
|
|
PLACE_NAME = "Wisconsin State Capitol, Madison, USA"
|
|
DIST = 3000
|
|
|
|
# Road width settings (meters)
|
|
LANE_WIDTH_DEFAULT = 3.5
|
|
DEFAULT_WIDTHS = {
|
|
"motorway": 12,
|
|
"trunk": 11,
|
|
"primary": 10,
|
|
"secondary": 9,
|
|
"tertiary": 8,
|
|
"residential": 6,
|
|
"service": 4,
|
|
"unclassified": 5,
|
|
"cycleway": 2,
|
|
"footway": 1.5,
|
|
"path": 1.5,
|
|
}
|
|
|
|
|
|
# Census/Zoning Simulation Settings
|
|
# Approximate density per cubic meter of building volume
|
|
POP_DENSITY_FACTOR = 0.05 # People per m3 (Residential)
|
|
JOB_DENSITY_FACTOR = 0.08 # Jobs per m3 (Commercial)
|
|
|
|
|
|
# ==========================================
|
|
# 2. Helpers
|
|
# ==========================================
|
|
def get_height(row):
|
|
h = 8.0
|
|
if "height" in row and str(row["height"]).lower() != "nan":
|
|
try:
|
|
clean = "".join(
|
|
filter(lambda x: x.isdigit() or x == ".", str(row["height"]))
|
|
)
|
|
h = float(clean)
|
|
except:
|
|
pass
|
|
elif "building:levels" in row and str(row["building:levels"]).lower() != "nan":
|
|
try:
|
|
clean = "".join(
|
|
filter(lambda x: x.isdigit() or x == ".", str(row["building:levels"]))
|
|
)
|
|
h = float(clean) * 3.5
|
|
except:
|
|
pass
|
|
return round(h, 1)
|
|
|
|
|
|
def estimate_road_width(row):
|
|
for key in ["width", "width:carriageway", "est_width"]:
|
|
if key in row and str(row[key]) != "nan":
|
|
val_str = str(row[key]).lower()
|
|
try:
|
|
nums = re.findall(r"[-+]?\d*\.\d+|\d+", val_str)
|
|
if nums:
|
|
val = float(nums[0])
|
|
if "'" in val_str or "ft" in val_str or "feet" in val_str:
|
|
val *= 0.3048
|
|
elif val > 50:
|
|
val *= 0.3048
|
|
return val
|
|
except:
|
|
pass
|
|
|
|
if "lanes" in row and str(row["lanes"]) != "nan":
|
|
try:
|
|
clean = re.findall(r"\d+", str(row["lanes"]))
|
|
if clean:
|
|
lanes = int(clean[0])
|
|
lanes = max(1, min(lanes, 6))
|
|
return lanes * LANE_WIDTH_DEFAULT
|
|
except:
|
|
pass
|
|
|
|
highway = row.get("highway", "residential")
|
|
if isinstance(highway, list):
|
|
highway = highway[0]
|
|
return DEFAULT_WIDTHS.get(highway, 4.0)
|
|
|
|
|
|
def classify_building(row, height, area):
|
|
b_type = str(row.get("building", "yes")).lower()
|
|
amenity = str(row.get("amenity", "")).lower()
|
|
office = str(row.get("office", "")).lower()
|
|
shop = str(row.get("shop", "")).lower()
|
|
|
|
volume = area * height
|
|
|
|
residential_tags = [
|
|
"apartments",
|
|
"residential",
|
|
"house",
|
|
"detached",
|
|
"terrace",
|
|
"dormitory",
|
|
"hotel",
|
|
]
|
|
commercial_tags = [
|
|
"commercial",
|
|
"office",
|
|
"retail",
|
|
"industrial",
|
|
"university",
|
|
"school",
|
|
"hospital",
|
|
"public",
|
|
]
|
|
|
|
is_res = any(t in b_type for t in residential_tags)
|
|
is_com = (
|
|
any(t in b_type for t in commercial_tags)
|
|
or (amenity != "nan" and amenity != "")
|
|
or (office != "nan" and office != "")
|
|
or (shop != "nan" and shop != "")
|
|
)
|
|
|
|
if not is_res and not is_com:
|
|
if volume > 5000:
|
|
is_com = True
|
|
else:
|
|
is_res = True
|
|
|
|
pop = 0
|
|
jobs = 0
|
|
category = "none"
|
|
density_score = 0
|
|
|
|
if is_res:
|
|
pop = round(volume * POP_DENSITY_FACTOR)
|
|
category = "residential"
|
|
density_score = min(1.0, pop / 500)
|
|
elif is_com:
|
|
jobs = round(volume * JOB_DENSITY_FACTOR)
|
|
category = "commercial"
|
|
density_score = min(1.0, jobs / 1000)
|
|
|
|
return category, density_score, pop, jobs
|
|
|
|
|
|
def parse_geometry(geom, center_x, center_y):
|
|
if geom.is_empty:
|
|
return []
|
|
polys = []
|
|
if geom.geom_type == "Polygon":
|
|
source_geoms = [geom]
|
|
elif geom.geom_type == "MultiPolygon":
|
|
source_geoms = geom.geoms
|
|
else:
|
|
return []
|
|
|
|
for poly in source_geoms:
|
|
outer = [
|
|
[round(x - center_x, 2), round(y - center_y, 2)]
|
|
for x, y in poly.exterior.coords
|
|
]
|
|
holes = []
|
|
for interior in poly.interiors:
|
|
hole_coords = [
|
|
[round(x - center_x, 2), round(y - center_y, 2)]
|
|
for x, y in interior.coords
|
|
]
|
|
holes.append(hole_coords)
|
|
polys.append({"outer": outer, "holes": holes})
|
|
return polys
|
|
|
|
|
|
def parse_line_points(geom, center_x, center_y):
|
|
if geom.geom_type == "LineString":
|
|
return [
|
|
[round(x - center_x, 2), round(y - center_y, 2)] for x, y in geom.coords
|
|
]
|
|
return []
|
|
|
|
|
|
# ==========================================
|
|
# 3. Execution
|
|
# ==========================================
|
|
print(f"1. Downloading Data for: {PLACE_NAME}...")
|
|
|
|
# Define valid tags lists for filtering later
|
|
PARK_TAGS_LEISURE = [
|
|
"park",
|
|
"garden",
|
|
"playground",
|
|
"golf_course",
|
|
"pitch",
|
|
"recreation_ground",
|
|
]
|
|
PARK_TAGS_LANDUSE = [
|
|
"grass",
|
|
"forest",
|
|
"park",
|
|
"meadow",
|
|
"village_green",
|
|
"recreation_ground",
|
|
"orchard",
|
|
]
|
|
NATURAL_TAGS = ["water", "bay", "coastline"]
|
|
|
|
tags_visual = {
|
|
"building": True,
|
|
"natural": NATURAL_TAGS,
|
|
"leisure": PARK_TAGS_LEISURE,
|
|
"landuse": PARK_TAGS_LANDUSE,
|
|
"amenity": True, # Needed for zoning, but MUST be filtered out of geometry
|
|
"office": True,
|
|
"shop": True,
|
|
}
|
|
gdf_visual = ox.features.features_from_address(PLACE_NAME, tags=tags_visual, dist=DIST)
|
|
|
|
print(" Downloading Road Graph...")
|
|
G = ox.graph.graph_from_address(PLACE_NAME, dist=DIST, network_type="drive")
|
|
gdf_nodes, gdf_edges = ox.graph_to_gdfs(G)
|
|
|
|
print("2. Projecting Coordinates...")
|
|
utm_crs = gdf_visual.estimate_utm_crs()
|
|
gdf_visual = gdf_visual.to_crs(utm_crs)
|
|
gdf_edges = gdf_edges.to_crs(utm_crs)
|
|
gdf_nodes = gdf_nodes.to_crs(utm_crs)
|
|
|
|
center_x = gdf_visual.geometry.centroid.x.mean()
|
|
center_y = gdf_visual.geometry.centroid.y.mean()
|
|
|
|
output_visual = {"buildings": [], "water": [], "parks": [], "roads": []}
|
|
output_routing = {"nodes": {}, "edges": []}
|
|
|
|
building_data_points = []
|
|
|
|
print("3. Processing Visual Layers & Census Simulation...")
|
|
|
|
for idx, row in gdf_visual.iterrows():
|
|
# Only process polygons
|
|
if row.geometry.geom_type not in ["Polygon", "MultiPolygon"]:
|
|
continue
|
|
|
|
polygons = parse_geometry(row.geometry, center_x, center_y)
|
|
|
|
for poly_data in polygons:
|
|
# -----------------------------
|
|
# 1. BUILDINGS
|
|
# -----------------------------
|
|
if "building" in row and str(row["building"]) != "nan":
|
|
height = get_height(row)
|
|
area = row.geometry.area
|
|
cat, score, pop, jobs = classify_building(row, height, area)
|
|
|
|
cx = row.geometry.centroid.x - center_x
|
|
cy = row.geometry.centroid.y - center_y
|
|
building_data_points.append([cx, cy, pop, jobs])
|
|
|
|
output_visual["buildings"].append(
|
|
{
|
|
"shape": poly_data,
|
|
"height": height,
|
|
"data": {"type": cat, "density": score, "pop": pop, "jobs": jobs},
|
|
}
|
|
)
|
|
|
|
# -----------------------------
|
|
# 2. WATER
|
|
# -----------------------------
|
|
elif ("natural" in row and str(row["natural"]) in NATURAL_TAGS) or (
|
|
"water" in row and str(row["water"]) != "nan"
|
|
):
|
|
output_visual["water"].append({"shape": poly_data})
|
|
|
|
# -----------------------------
|
|
# 3. PARKS (STRICT FILTER)
|
|
# -----------------------------
|
|
elif ("leisure" in row and str(row["leisure"]) in PARK_TAGS_LEISURE) or (
|
|
"landuse" in row and str(row["landuse"]) in PARK_TAGS_LANDUSE
|
|
):
|
|
output_visual["parks"].append({"shape": poly_data})
|
|
|
|
# -----------------------------
|
|
# 4. IGNORE EVERYTHING ELSE
|
|
# -----------------------------
|
|
# Amenities, parking lots, and landuse=commercial fall here and are NOT drawn.
|
|
else:
|
|
pass
|
|
|
|
print(" Buffering roads...")
|
|
road_polys = []
|
|
for idx, row in gdf_edges.iterrows():
|
|
width = estimate_road_width(row)
|
|
buffered = row.geometry.buffer(width / 2, cap_style=2, join_style=2)
|
|
road_polys.append(buffered)
|
|
|
|
if road_polys:
|
|
merged_roads = unary_union(road_polys)
|
|
road_shapes = parse_geometry(merged_roads, center_x, center_y)
|
|
for shape in road_shapes:
|
|
output_visual["roads"].append({"shape": shape})
|
|
|
|
print("4. Mapping Census Data to Graph Nodes...")
|
|
if building_data_points:
|
|
b_coords = np.array([[b[0], b[1]] for b in building_data_points])
|
|
b_data = np.array([[b[2], b[3]] for b in building_data_points])
|
|
tree = cKDTree(b_coords)
|
|
|
|
for node_id, row in gdf_nodes.iterrows():
|
|
nx = row.geometry.x - center_x
|
|
ny = row.geometry.y - center_y
|
|
|
|
if building_data_points:
|
|
indices = tree.query_ball_point([nx, ny], r=100)
|
|
if indices:
|
|
nearby_stats = np.sum(b_data[indices], axis=0)
|
|
node_pop = int(nearby_stats[0])
|
|
node_jobs = int(nearby_stats[1])
|
|
else:
|
|
node_pop, node_jobs = 0, 0
|
|
else:
|
|
node_pop, node_jobs = 0, 0
|
|
|
|
output_routing["nodes"][int(node_id)] = {
|
|
"x": round(nx, 2),
|
|
"y": round(ny, 2),
|
|
"pop": node_pop,
|
|
"jobs": node_jobs,
|
|
}
|
|
|
|
for u, v, k in G.edges(keys=True):
|
|
try:
|
|
row = gdf_edges.loc[(u, v, k)]
|
|
if isinstance(row, (type(gdf_edges),)):
|
|
row = row.iloc[0]
|
|
except KeyError:
|
|
continue
|
|
|
|
output_routing["edges"].append(
|
|
{
|
|
"u": int(u),
|
|
"v": int(v),
|
|
"oneway": bool(row.get("oneway", False)),
|
|
"points": parse_line_points(row.geometry, center_x, center_y),
|
|
"length": round(float(row.geometry.length), 2),
|
|
}
|
|
)
|
|
|
|
out_dir = os.path.join(os.path.dirname(__file__), "../public")
|
|
os.makedirs(out_dir, exist_ok=True)
|
|
with open(os.path.join(out_dir, "city_data.json"), "w") as f:
|
|
json.dump(output_visual, f)
|
|
with open(os.path.join(out_dir, "routing_graph.json"), "w") as f:
|
|
json.dump(output_routing, f)
|
|
print(f"Done! Exported to {out_dir}")
|