"""
Road network visualization for terrain rendering using data layer pipeline.
This module provides functions to:
1. Fetch road data from OpenStreetMap Overpass API (with caching)
2. Rasterize roads as a data layer for terrain visualization
3. Apply roads through the transformation pipeline (reproject, downsample)
Roads are treated as a proper geographic data layer with coordinate system
and transform, ensuring correct alignment when downsampling or reprojecting.
Usage - Simple API:
from src.terrain.roads import get_roads, add_roads_layer
# Fetch roads for a bounding box (cached for 30 days)
bbox = (32.5, -117.6, 33.5, -116.0) # (south, west, north, east)
roads_geojson = get_roads(bbox, road_types=['motorway', 'trunk', 'primary'])
# Add to terrain
add_roads_layer(terrain, roads_geojson, bbox)
Usage - Large areas (tiled fetching):
from src.terrain.roads import get_roads_tiled
# Automatically tiles large areas to respect API limits
bbox = (37.0, -95.0, 45.0, -85.0) # Large multi-state area
roads = get_roads_tiled(bbox, tile_size=2.0)
Usage - Manual pipeline:
from src.terrain.roads import get_roads, rasterize_roads_to_layer
roads_geojson = get_roads(bbox)
road_grid, road_transform = rasterize_roads_to_layer(
roads_geojson, bbox, resolution=30 # 30m pixels
)
# Add as data layer (library handles transform automatically)
terrain.add_data_layer(
"roads",
road_grid,
road_transform,
"EPSG:4326",
target_layer="dem", # Align to DEM grid
)
"""
import hashlib
import json
import logging
import time
import numpy as np
from datetime import datetime, timedelta
from typing import Dict, Any, Optional, Tuple, List
from pathlib import Path
import requests
logger = logging.getLogger(__name__)
# =============================================================================
# ROAD DATA FETCHING (OpenStreetMap Overpass API)
# =============================================================================
def build_overpass_query(
bbox: Tuple[float, float, float, float],
road_types: List[str],
) -> str:
"""
Build Overpass QL query for road network data.
Args:
bbox: (south, west, north, east) in WGS84 (EPSG:4326)
road_types: List of OSM highway tags ('motorway', 'trunk', 'primary')
Returns:
Overpass QL query string
"""
south, west, north, east = bbox
bbox_str = f"{south},{west},{north},{east}"
# Build query for each road type
way_queries = []
for road_type in road_types:
way_queries.append(f' way["highway"="{road_type}"]({bbox_str});')
way_clauses = "\n".join(way_queries)
query = f"""
[out:json][timeout:60];
(
{way_clauses}
);
out geom;
"""
return query
def _compute_bbox_hash(bbox: Tuple[float, float, float, float]) -> str:
"""
Compute hash for bounding box (used for cache keys).
Args:
bbox: (south, west, north, east)
Returns:
SHA256 hash of bbox coordinates
"""
bbox_str = f"{bbox[0]:.6f},{bbox[1]:.6f},{bbox[2]:.6f},{bbox[3]:.6f}"
return hashlib.sha256(bbox_str.encode()).hexdigest()
def _get_cache_dir() -> Path:
"""
Get or create cache directory for road data.
Returns:
Path to cache directory (data/cache/roads/)
"""
cache_dir = Path.cwd() / "data" / "cache" / "roads"
cache_dir.mkdir(parents=True, exist_ok=True)
return cache_dir
def _load_cached_roads(
bbox: Tuple[float, float, float, float],
) -> Optional[Dict[str, Any]]:
"""
Load road data from cache if it exists and is fresh.
Args:
bbox: (south, west, north, east)
Returns:
GeoJSON dict if cache exists and is fresh (< 30 days), None otherwise
"""
bbox_hash = _compute_bbox_hash(bbox)
cache_dir = _get_cache_dir()
cache_file = cache_dir / f"roads_{bbox_hash}.geojson"
meta_file = cache_dir / f"roads_{bbox_hash}_meta.json"
if not cache_file.exists() or not meta_file.exists():
logger.debug(f" No cache found for bbox {bbox_hash[:8]}...")
return None
# Check cache expiration (30 days)
try:
with open(meta_file) as f:
meta = json.load(f)
created_time = datetime.fromisoformat(meta.get("created_at", ""))
age = datetime.now() - created_time
max_age = timedelta(days=30)
if age > max_age:
logger.debug(f" Cache expired ({age.days} days old)")
return None
logger.debug(f" Loading cached roads (age: {age.days} days)")
with open(cache_file) as f:
return json.load(f)
except Exception as e:
logger.warning(f" Error loading cache: {e}")
return None
def _cache_road_data(
bbox: Tuple[float, float, float, float],
geojson_data: Dict[str, Any],
) -> None:
"""
Save road data to cache.
Args:
bbox: (south, west, north, east)
geojson_data: GeoJSON FeatureCollection
"""
bbox_hash = _compute_bbox_hash(bbox)
cache_dir = _get_cache_dir()
cache_file = cache_dir / f"roads_{bbox_hash}.geojson"
meta_file = cache_dir / f"roads_{bbox_hash}_meta.json"
try:
# Save GeoJSON
with open(cache_file, "w") as f:
json.dump(geojson_data, f, indent=2)
# Save metadata
meta = {
"created_at": datetime.now().isoformat(),
"bbox": bbox,
"num_features": len(geojson_data.get("features", [])),
}
with open(meta_file, "w") as f:
json.dump(meta, f, indent=2)
logger.info(f" Cached {len(geojson_data.get('features', []))} roads")
except Exception as e:
logger.warning(f" Error caching road data: {e}")
def _fetch_roads_from_osm(
bbox: Tuple[float, float, float, float],
road_types: List[str],
timeout: int = 60,
) -> Optional[Dict[str, Any]]:
"""
Fetch road data from OpenStreetMap Overpass API.
Args:
bbox: (south, west, north, east) in WGS84
road_types: List of OSM highway tags
timeout: Request timeout in seconds
Returns:
GeoJSON FeatureCollection or None if fetch fails
"""
south, west, north, east = bbox
logger.info("Fetching road data from OpenStreetMap Overpass API...")
logger.info(f" Extent: lat [{south:.3f}, {north:.3f}], lon [{west:.3f}, {east:.3f}]")
logger.info(f" Road types: {', '.join(road_types)}")
query = build_overpass_query(bbox, road_types)
try:
overpass_url = "https://overpass-api.de/api/interpreter"
logger.debug(f" Sending query to {overpass_url} (timeout: {timeout}s)...")
response = requests.post(overpass_url, data={"data": query}, timeout=timeout)
response.raise_for_status()
data = response.json()
except requests.exceptions.Timeout:
logger.error(f" Overpass API timeout after {timeout}s")
return None
except requests.exceptions.HTTPError as e:
if response.status_code == 429:
logger.error(" Overpass API rate limited (429)")
else:
logger.error(f" Overpass API error: {response.status_code}")
return None
except Exception as e:
logger.error(f" Failed to fetch from Overpass API: {e}")
return None
# Parse OSM data into GeoJSON
features = []
for element in data.get("elements", []):
try:
if element.get("type") != "way":
continue
# Extract road geometry (coordinates)
nodes = element.get("nodes", [])
if not nodes or len(nodes) < 2:
continue
# Get coordinates from geometry if available
if "geometry" not in element:
continue
coords = [[node["lon"], node["lat"]] for node in element["geometry"]]
if len(coords) < 2:
continue
# Extract tags
tags = element.get("tags", {})
highway_type = tags.get("highway", "unknown")
name = tags.get("name", f"Road_{element['id']}")
ref = tags.get("ref", "")
# Create GeoJSON feature
feature = {
"type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": coords,
},
"properties": {
"osm_id": element["id"],
"name": name,
"ref": ref,
"highway": highway_type,
},
}
features.append(feature)
except Exception as e:
logger.debug(f" Skipped element {element.get('id')}: {e}")
continue
logger.info(f" Found {len(features)} road segments")
# Return as GeoJSON FeatureCollection
return {
"type": "FeatureCollection",
"features": features,
}
def get_roads(
bbox: Tuple[float, float, float, float],
road_types: Optional[List[str]] = None,
force_refresh: bool = False,
) -> Optional[Dict[str, Any]]:
"""
Get road network data for a bounding box from OpenStreetMap.
Fetches from cache if available, otherwise queries Overpass API.
Results are cached locally for 30 days.
Args:
bbox: (south, west, north, east) in WGS84 (EPSG:4326)
road_types: OSM highway tags to include. Default: ['motorway', 'trunk', 'primary']
Common types:
- 'motorway': Interstates, major freeways
- 'trunk': Major state routes
- 'primary': Primary state/county roads
- 'secondary': Secondary roads
- 'tertiary': Local connecting roads
force_refresh: Force fresh fetch, skip cache
Returns:
GeoJSON FeatureCollection with road LineStrings, or None if fetch fails
Example:
>>> from src.terrain.roads import get_roads
>>> bbox = (32.5, -117.6, 33.5, -116.0) # San Diego
>>> roads = get_roads(bbox, road_types=['motorway', 'trunk'])
>>> print(f"Found {len(roads['features'])} road segments")
"""
if road_types is None:
road_types = ["motorway", "trunk", "primary"]
logger.info(f"Loading roads for bbox: {bbox}")
# Try cache first
if not force_refresh:
cached = _load_cached_roads(bbox)
if cached is not None:
logger.info(f" Using cached roads ({len(cached['features'])} segments)")
return cached
# Fetch from API
start_time = time.time()
geojson = _fetch_roads_from_osm(bbox, road_types)
if geojson is None:
logger.warning(" Failed to fetch roads from API, returning empty collection")
return {"type": "FeatureCollection", "features": []}
elapsed = time.time() - start_time
logger.info(f" Fetch completed in {elapsed:.1f}s")
# Cache the result
_cache_road_data(bbox, geojson)
return geojson
[docs]
def get_roads_tiled(
bbox: Tuple[float, float, float, float],
road_types: Optional[List[str]] = None,
tile_size: float = 2.0,
retry_count: int = 1,
retry_delay: float = 2.0,
force_refresh: bool = False,
) -> Optional[Dict[str, Any]]:
"""
Fetch roads for large areas by tiling and merging results.
Works globally - anywhere OpenStreetMap has road coverage.
Automatically tiles large bounding boxes to respect Overpass API limits
and handles retries for failed tiles.
Args:
bbox: (south, west, north, east) in WGS84 - works for any location worldwide
road_types: OSM highway tags to include. Default: ['motorway', 'trunk', 'primary']
tile_size: Size of tiles in degrees (default: 2.0° recommended for Overpass API)
retry_count: Number of retries for failed tiles (default: 1)
retry_delay: Delay in seconds before retrying (default: 2.0)
force_refresh: Force fresh fetch, skip cache (default: False)
Returns:
Merged GeoJSON FeatureCollection with all road segments from all tiles,
or empty FeatureCollection if all fetches fail.
Example:
>>> # Large area covering multiple US states
>>> bbox = (37.0, -95.0, 45.0, -85.0)
>>> roads = get_roads_tiled(bbox, road_types=["motorway", "trunk"])
>>> print(f"Found {len(roads['features'])} road segments")
>>> # Works globally - Alps region
>>> alps_bbox = (45.5, 6.0, 47.5, 14.0)
>>> roads = get_roads_tiled(alps_bbox, tile_size=1.5)
"""
import math
if road_types is None:
road_types = ["motorway", "trunk", "primary"]
south, west, north, east = bbox
lat_span = north - south
lon_span = east - west
logger.info(f"Fetching roads for bbox: lat [{south:.2f}, {north:.2f}], lon [{west:.2f}, {east:.2f}]")
logger.info(f" Coverage: {lat_span:.1f}° lat × {lon_span:.1f}° lon")
# Check if tiling is needed
if lat_span <= tile_size and lon_span <= tile_size:
# Small bbox - single fetch
logger.info(" Small area - single fetch (no tiling needed)")
result = get_roads(bbox, road_types, force_refresh=force_refresh)
return result if result else {"type": "FeatureCollection", "features": []}
# Large bbox - tile and merge
lat_tiles = int(math.ceil(lat_span / tile_size))
lon_tiles = int(math.ceil(lon_span / tile_size))
total_tiles = lat_tiles * lon_tiles
logger.info(f" Large area - fetching in {tile_size}° tiles ({lat_tiles}×{lon_tiles} = {total_tiles} tiles)")
all_features = []
failed_tiles = []
# Fetch all tiles
for lat_idx in range(lat_tiles):
for lon_idx in range(lon_tiles):
tile_south = south + lat_idx * tile_size
tile_north = min(tile_south + tile_size, north)
tile_west = west + lon_idx * tile_size
tile_east = min(tile_west + tile_size, east)
tile_bbox = (tile_south, tile_west, tile_north, tile_east)
tile_num = lat_idx * lon_tiles + lon_idx + 1
logger.info(f" Tile {tile_num}/{total_tiles}: lat [{tile_south:.2f}, {tile_north:.2f}], lon [{tile_west:.2f}, {tile_east:.2f}]")
tile_roads = get_roads(tile_bbox, road_types, force_refresh=force_refresh)
if tile_roads and tile_roads.get("features"):
feature_count = len(tile_roads["features"])
all_features.extend(tile_roads["features"])
logger.info(f" ✓ {feature_count} road segments")
else:
failed_tiles.append((tile_num, tile_bbox))
logger.warning(f" ✗ No roads returned (timeout or error)")
# Retry failed tiles
if failed_tiles and retry_count > 0:
logger.info(f" Retrying {len(failed_tiles)} failed tiles...")
time.sleep(retry_delay)
still_failed = []
for tile_num, tile_bbox in failed_tiles:
logger.info(f" Retry tile {tile_num}...")
tile_roads = get_roads(tile_bbox, road_types, force_refresh=True)
if tile_roads and tile_roads.get("features"):
feature_count = len(tile_roads["features"])
all_features.extend(tile_roads["features"])
logger.info(f" ✓ Retry succeeded: {feature_count} segments")
else:
still_failed.append(tile_num)
logger.warning(f" ✗ Retry failed")
if still_failed:
logger.warning(f" {len(still_failed)} tiles failed after retry: {still_failed}")
logger.info(f" Loaded {len(all_features)} total road segments from {total_tiles} tiles")
return {"type": "FeatureCollection", "features": all_features}
# =============================================================================
# ROAD VISUALIZATION
# =============================================================================
[docs]
def road_colormap(road_grid, score=None):
"""
Map roads to a distinctive red color for special material treatment.
The red color (180, 30, 30) is used as a marker so the Blender material
shader can identify road pixels and apply a glassy/emissive effect.
Args:
road_grid: 2D array of road values (0=no road, >0=road)
score: Unused, kept for API compatibility.
Returns:
Array of RGB colors with shape (height, width, 3) as uint8
"""
# Create road mask (any road value > 0)
road_mask = road_grid > 0.5 # Use 0.5 threshold to handle resampling artifacts
height, width = road_grid.shape
colors = np.zeros((height, width, 3), dtype=np.uint8)
# Deep red for roads - will be made glassy/emissive by material shader
road_red_color = (180, 30, 30)
colors[road_mask] = road_red_color
return colors
[docs]
def get_viridis_colormap():
"""
Get viridis colormap function.
Returns:
Function that maps normalized values (0-1) to (R, G, B)
"""
try:
import matplotlib.pyplot as plt
viridis = plt.colormaps.get_cmap('viridis')
return lambda x: viridis(np.clip(x, 0, 1))[:3]
except ImportError:
logger.warning("matplotlib not available, using simple grayscale colormap")
return lambda x: (x, x, x)
[docs]
def smooth_road_mask(
road_mask: np.ndarray,
sigma: float = 1.0,
) -> np.ndarray:
"""
Apply Gaussian blur to road mask for anti-aliased edges (GPU-accelerated).
The Bresenham line algorithm creates stair-step (aliased) edges.
Applying Gaussian smoothing creates soft anti-aliased boundaries that
render more smoothly, especially after the mask goes through resampling.
Uses PyTorch GPU acceleration when available (6x speedup on CUDA).
Args:
road_mask: 2D array of road values (0=no road, >0=road)
sigma: Gaussian blur sigma in pixels (default: 1.0).
Higher values = softer edges. Typical range: 0.5-2.0.
- 0.5: Minimal softening
- 1.0: Standard anti-aliasing (recommended)
- 2.0: Very soft/blurry edges
Returns:
Smoothed road mask as float32 array. Values are now continuous
(not binary) and may need thresholding if binary mask is needed.
"""
from src.terrain.gpu_ops import gpu_gaussian_blur
if sigma <= 0:
return road_mask.astype(np.float32)
logger.info(f"Anti-aliasing road mask (sigma={sigma})...")
# Convert to float for smooth blending
mask_float = road_mask.astype(np.float32)
# Apply Gaussian blur (GPU-accelerated)
smoothed = gpu_gaussian_blur(mask_float, sigma=sigma)
# Preserve original values where roads are strong, blend at edges
# This keeps road centers at original intensity while softening edges
road_pixels = np.sum(road_mask > 0.5)
edge_pixels = np.sum((smoothed > 0.1) & (smoothed < 0.9))
logger.info(f"✓ Road mask anti-aliased: {road_pixels} road pixels, {edge_pixels} edge pixels")
return smoothed
[docs]
def smooth_dem_along_roads(
dem: np.ndarray,
road_mask: np.ndarray,
smoothing_radius: int = 2,
) -> np.ndarray:
"""
Smooth the DEM along roads to reduce elevation detail (GPU-accelerated).
Applies Gaussian smoothing only to road pixels. Non-road pixels
are unchanged. The smoothing kernel should be about half the road width.
Uses PyTorch GPU acceleration when available (6x speedup on CUDA).
Args:
dem: 2D array of elevation values
road_mask: 2D boolean or float array where >0.5 = road
smoothing_radius: Radius for Gaussian smoothing (default: 2 pixels)
Returns:
Smoothed DEM array (same shape as input)
"""
from src.terrain.gpu_ops import gpu_gaussian_blur
logger.info(f"Smoothing DEM along roads (radius={smoothing_radius})...")
# Create binary road mask
road_binary = road_mask > 0.5
# Apply Gaussian smoothing to entire DEM (GPU-accelerated)
smoothed_dem = gpu_gaussian_blur(dem.astype(np.float32), sigma=float(smoothing_radius))
# Only replace road pixels with smoothed values
result = dem.copy()
result[road_binary] = smoothed_dem[road_binary]
road_pixels = np.sum(road_binary)
logger.info(f"✓ Smoothed {road_pixels} road pixels")
return result.astype(dem.dtype)
[docs]
def smooth_road_vertices(
vertices: np.ndarray,
road_mask: np.ndarray,
y_valid: np.ndarray,
x_valid: np.ndarray,
smoothing_radius: int = 2,
) -> np.ndarray:
"""
Smooth Z coordinates of mesh vertices that are on roads.
Reconstructs a 2D Z grid from vertex positions, applies 2D Gaussian
smoothing in grid space, then writes smoothed values back to road
vertices only. Non-road vertices are unchanged.
This ensures smoothing follows the spatial layout of the road rather
than the arbitrary vertex index order.
Args:
vertices: Mesh vertex positions (N, 3) array with [x, y, z] coords
road_mask: 2D array (H, W) where >0.5 indicates road pixels
y_valid: Array (N,) of y indices mapping vertices to road_mask rows
x_valid: Array (N,) of x indices mapping vertices to road_mask columns
smoothing_radius: Gaussian smoothing sigma (default: 2, use 0 to disable)
Returns:
Modified vertices array with smoothed Z values on roads.
X and Y coordinates are never modified.
"""
from scipy.ndimage import gaussian_filter
# Return unchanged if no smoothing
if smoothing_radius <= 0:
return vertices.copy()
result = vertices.copy()
n_surface_vertices = len(y_valid)
h, w = road_mask.shape
# Find which surface vertices are on roads
in_bounds = (
(y_valid >= 0) & (y_valid < h) &
(x_valid >= 0) & (x_valid < w)
)
road_vertex_mask = np.zeros(n_surface_vertices, dtype=bool)
road_vertex_mask[in_bounds] = road_mask[y_valid[in_bounds], x_valid[in_bounds]] > 0.5
road_indices = np.where(road_vertex_mask)[0]
if len(road_indices) == 0:
return result
# Reconstruct 2D Z grid from vertex positions
z_grid = np.full((h, w), np.nan, dtype=np.float64)
z_grid[y_valid[in_bounds], x_valid[in_bounds]] = result[
np.where(in_bounds)[0], 2
]
# Create a road-only Z grid: set non-road pixels to NaN so the Gaussian
# only averages road Z values with neighboring road Z values.
# To handle NaN in gaussian_filter, use the normalized convolution trick:
# smoothed = convolve(data_with_zeros) / convolve(mask)
is_road = road_mask > 0.5
valid_road = is_road & ~np.isnan(z_grid)
z_for_smooth = np.where(valid_road, z_grid, 0.0)
weight = valid_road.astype(np.float64)
sigma = float(smoothing_radius)
z_smoothed = gaussian_filter(z_for_smooth, sigma=sigma, mode='nearest')
w_smoothed = gaussian_filter(weight, sigma=sigma, mode='nearest')
# Avoid division by zero where no valid road neighbors exist
safe_mask = w_smoothed > 1e-10
z_result = np.where(safe_mask, z_smoothed / w_smoothed, z_grid)
# Write smoothed Z back to road vertices only
result[road_indices, 2] = z_result[
y_valid[road_indices], x_valid[road_indices]
]
logger.info(f"✓ Smoothed {len(road_indices)} road vertices in 2D grid space "
f"(sigma={smoothing_radius})")
return result
[docs]
def offset_road_vertices(
vertices: np.ndarray,
road_mask: np.ndarray,
y_valid: np.ndarray,
x_valid: np.ndarray,
offset: float = 0.0,
) -> np.ndarray:
"""
Offset Z coordinates of mesh vertices that are on roads by a fixed amount.
A simpler alternative to smooth_road_vertices. Raises or lowers all road
vertices by a constant offset, making roads visually distinct from terrain.
Args:
vertices: Mesh vertex positions (N, 3) array with [x, y, z] coords
road_mask: 2D array (H, W) where >0.5 indicates road pixels
y_valid: Array (N,) of y indices mapping vertices to road_mask rows
x_valid: Array (N,) of x indices mapping vertices to road_mask columns
offset: Z offset to apply to road vertices. Positive = raise, negative = lower.
Default: 0.0 (no change)
Returns:
Modified vertices array with offset Z values on roads.
X and Y coordinates are never modified.
"""
# Return unchanged if no offset
if offset == 0.0:
return vertices.copy()
result = vertices.copy()
n_surface_vertices = len(y_valid)
# Find which surface vertices are on roads
in_bounds = (
(y_valid >= 0) & (y_valid < road_mask.shape[0]) &
(x_valid >= 0) & (x_valid < road_mask.shape[1])
)
# Only surface vertices (with grid mappings) can be on roads
road_vertex_mask = np.zeros(n_surface_vertices, dtype=bool)
road_vertex_mask[in_bounds] = road_mask[y_valid[in_bounds], x_valid[in_bounds]] > 0.5
# Apply offset to road vertices
road_indices = np.where(road_vertex_mask)[0]
if len(road_indices) > 0:
result[road_indices, 2] += offset
return result
# =============================================================================
# DATA LAYER API
# =============================================================================
[docs]
def rasterize_roads_to_layer(
roads_geojson: Dict[str, Any],
bbox: Tuple[float, float, float, float],
resolution: float = 30.0,
road_width_pixels: int = 3,
) -> Tuple[np.ndarray, Any]:
"""
Rasterize GeoJSON roads to a layer grid with proper geographic transform.
Converts vector road data (GeoJSON LineStrings) to a raster grid where each
pixel represents road presence/type. The result includes an Affine transform
in WGS84 (EPSG:4326) coordinates for proper geographic alignment.
This is the key function that treats roads as a data layer - the output
can be added to terrain via add_data_layer() and will automatically align
to the DEM through reprojection and resampling.
Args:
roads_geojson: GeoJSON FeatureCollection with road LineStrings
bbox: Bounding box as (south, west, north, east) in WGS84 degrees
resolution: Pixel size in meters (default: 30.0). At Detroit latitude,
~30m/pixel gives good detail without excessive memory use.
road_width_pixels: Width of roads in raster pixels (default: 3). Draws roads
with thickness instead of 1-pixel lines. Use 1 for thin roads, 3-5 for
more visible roads. At 30m resolution, 3 pixels ≈ 90m visual width.
Returns:
Tuple of:
- road_grid: 2D array of uint8, values 0=no road, 1-4=road type (motorway > trunk > primary > secondary)
- road_transform: rasterio.Affine transform mapping pixels to WGS84 coordinates
"""
from rasterio import Affine
south, west, north, east = bbox
# Calculate grid dimensions based on resolution
# Convert meters to degrees (1 degree ≈ 111 km at equator, ~90 km at 45°N)
# Detroit is at ~42°N where 1 degree ≈ 84 km
meters_per_degree = 111000 * np.cos(np.radians((north + south) / 2))
pixel_size_degrees = resolution / meters_per_degree
# Calculate grid dimensions
width = int(np.ceil((east - west) / pixel_size_degrees))
height = int(np.ceil((north - south) / pixel_size_degrees))
logger.info(
f"Rasterizing roads to grid: {width}×{height} pixels "
f"({resolution}m resolution, bbox={bbox})"
)
# Create output grid
road_grid = np.zeros((height, width), dtype=np.uint8)
# Create transform: maps pixel (col, row) to geographic (lon, lat)
# Note: standard raster convention has Y decreasing (north-positive becomes row increasing)
road_transform = Affine(
pixel_size_degrees, 0, west, # pixel_width, skew_x, origin_x
0, -pixel_size_degrees, north # skew_y, pixel_height (negative = north-up), origin_y
)
logger.debug(f"Road transform: {road_transform}")
# Rasterize each road feature
processed = 0
skipped = 0
for feature in roads_geojson.get("features", []):
try:
geometry = feature.get("geometry", {})
if geometry.get("type") != "LineString":
skipped += 1
continue
# Get road type and priority
highway_type = feature.get("properties", {}).get("highway", "primary")
# Encode road type as uint8: motorway=4, trunk=3, primary=2, secondary=1, other=1
road_value = {
'motorway': 4,
'trunk': 3,
'primary': 2,
'secondary': 1,
}.get(highway_type, 1)
# Extract coordinates (WGS84 lon/lat)
coords = geometry.get("coordinates", [])
if len(coords) < 2:
skipped += 1
continue
# Convert geographic coords to pixel coordinates
road_pixels = []
for lon, lat in coords:
# Check bounds
if not (west <= lon <= east and south <= lat <= north):
continue
# Map WGS84 to pixel space
col = (lon - west) / pixel_size_degrees
row = (north - lat) / pixel_size_degrees # North-up: lat decreases = row increases
road_pixels.append((col, row))
if len(road_pixels) < 2:
skipped += 1
continue
# Draw line with width using Bresenham algorithm
for i in range(len(road_pixels) - 1):
x0, y0 = road_pixels[i]
x1, y1 = road_pixels[i + 1]
_draw_line_on_layer(road_grid, x0, y0, x1, y1, road_value, (height, width), road_width_pixels)
processed += 1
if processed % 1000 == 0:
logger.debug(f" Rasterized {processed} road segments...")
except Exception as e:
logger.debug(f"Failed to rasterize road feature: {e}")
skipped += 1
continue
logger.info(
f"Rasterized {processed}/{len(roads_geojson.get('features', []))} road segments "
f"({skipped} skipped, {np.count_nonzero(road_grid)} pixels with roads)"
)
return road_grid, road_transform
def _draw_line_on_layer(
grid: np.ndarray,
x0: float,
y0: float,
x1: float,
y1: float,
value: int,
shape: Tuple[int, int],
line_width: int = 1,
) -> None:
"""
Draw a thick line on a raster grid using Bresenham algorithm.
Args:
grid: 2D array to draw on
x0, y0, x1, y1: line endpoints in pixel coordinates
value: value to write to grid
shape: (height, width) of grid
line_width: thickness of line in pixels (default: 1). For width N,
draws a line with perpendicular thickness of N pixels.
"""
height, width = shape
# Convert to integers
x0, y0, x1, y1 = int(x0), int(y0), int(x1), int(y1)
# Bresenham line algorithm
dx = abs(x1 - x0)
dy = abs(y1 - y0)
sx = 1 if x0 < x1 else -1
sy = 1 if y0 < y1 else -1
err = dx - dy
x, y = x0, y0
half_width = line_width // 2
while True:
# Mark pixels in a cross/box pattern for thickness
if line_width == 1:
# Single pixel - fast path
if 0 <= y < height and 0 <= x < width:
grid[y, x] = max(grid[y, x], value)
else:
# Draw thick line using numpy slicing (vectorized)
# Calculate bounds with clipping to grid
y_start = max(0, y - half_width)
y_end = min(height, y + half_width + 1)
x_start = max(0, x - half_width)
x_end = min(width, x + half_width + 1)
if y_start < y_end and x_start < x_end:
grid[y_start:y_end, x_start:x_end] = np.maximum(
grid[y_start:y_end, x_start:x_end], value
)
if x == x1 and y == y1:
break
e2 = 2 * err
if e2 > -dy:
err -= dy
x += sx
if e2 < dx:
err += dx
y += sy
[docs]
def add_roads_layer(
terrain,
roads_geojson: Dict[str, Any],
bbox: Tuple[float, float, float, float],
resolution: float = 30.0,
road_width_pixels: int = 3,
) -> None:
"""
Add roads as a data layer to terrain with automatic coordinate alignment.
This is the high-level API for road integration. Roads are rasterized to
a grid with proper geographic metadata and added as a data layer. The library's
data layer pipeline ensures proper alignment even if terrain is downsampled
or reprojected.
To color roads, use the multi-overlay color mapping system::
roads_geojson = get_roads(bbox)
terrain.add_roads_layer(terrain, roads_geojson, bbox, road_width_pixels=3)
terrain.set_multi_color_mapping(
base_colormap=lambda dem: elevation_colormap(dem, 'michigan'),
base_source_layers=['dem'],
overlays=[{
'colormap': road_colormap,
'source_layers': ['roads'],
'priority': 10,
}],
)
terrain.compute_colors()
Args:
terrain: Terrain object (must have DEM data layer)
roads_geojson: GeoJSON FeatureCollection with road LineStrings
bbox: Bounding box as (south, west, north, east) in WGS84 degrees
resolution: Pixel size in meters for rasterization (default: 30.0)
road_width_pixels: Width of roads in raster pixels (default: 3). Higher values
make roads more visible. At 30m resolution, 3 pixels ≈ 90m visual width.
Raises:
ValueError: If terrain missing DEM data layer
"""
if not hasattr(terrain, "data_layers") or "dem" not in terrain.data_layers:
raise ValueError("Terrain DEM data layer not found.")
logger.info("Adding roads as data layer...")
# Rasterize roads to grid with WGS84 transform
road_grid, road_transform = rasterize_roads_to_layer(
roads_geojson, bbox, resolution, road_width_pixels
)
if not np.any(road_grid):
logger.warning("No roads were rasterized, skipping road layer")
return
# Add as data layer - library handles coordinate alignment automatically
# The target_layer="dem" ensures roads are reprojected to match DEM's grid
try:
terrain.add_data_layer(
"roads",
road_grid.astype(np.float32), # Convert to float for compatibility
road_transform,
"EPSG:4326",
target_layer="dem",
)
logger.info(f"✓ Roads added as data layer, aligned to DEM grid")
except Exception as e:
logger.error(f"Failed to add roads data layer: {e}")
raise
def _color_vertices_from_road_layer(
terrain,
colormap_name: str = "viridis",
) -> None:
"""
Color vertices based on road layer values (DEPRECATED).
This function is deprecated. Use the multi-overlay color mapping system instead::
terrain.set_multi_color_mapping(
base_colormap=...,
base_source_layers=['dem'],
overlays=[{
'colormap': road_colormap,
'source_layers': ['roads'],
'priority': 10,
}],
)
terrain.compute_colors()
Uses the transformed (downsampled/reprojected) road layer to color terrain
vertices. Road pixels are colored using a colormap based on road type.
Args:
terrain: Terrain object with aligned road layer and computed colors
colormap_name: Matplotlib colormap name (deprecated)
"""
if "roads" not in terrain.data_layers:
logger.warning("Roads layer not found in terrain")
return
roads_info = terrain.data_layers["roads"]
# Get the aligned/transformed road data (after reprojection to DEM grid)
# The add_data_layer function stores reprojected data in the layer
if roads_info.get("transformed", False) and "transformed_data" in roads_info:
road_data = roads_info["transformed_data"]
logger.info("Using transformed (aligned) road layer data")
else:
# Fall back to original if not yet transformed
road_data = roads_info.get("data")
logger.info("Using original road layer data")
if road_data is None:
logger.warning("No road data available for coloring")
return
if not hasattr(terrain, "y_valid") or not hasattr(terrain, "x_valid"):
logger.warning("Terrain missing y_valid/x_valid indices")
return
# Get valid pixel indices
y_valid = terrain.y_valid
x_valid = terrain.x_valid
if y_valid is None or x_valid is None:
logger.warning("y_valid or x_valid is None")
return
# Map road values to colors
# Road values: 0=no road, 1-4=road types
road_colormap = get_viridis_colormap()
# Normalize road values (1-4) to colormap range (0-1)
# Assign colors: motorway=0.8 (red), trunk=0.6 (orange), primary=0.4 (yellow), secondary=0.2 (green)
road_colors_map = {
0: None, # No road - don't color
1: 0.2, # Secondary - green
2: 0.4, # Primary - yellow
3: 0.6, # Trunk - orange
4: 0.8, # Motorway - red
}
height, width = road_data.shape
road_height, road_width = road_data.shape
logger.info(
f"Coloring vertices from road layer ({road_height}×{road_width}), "
f"{len(y_valid)} valid terrain vertices"
)
colored_count = 0
# For each valid vertex, check if it's on a road and apply color
for vertex_idx in range(len(y_valid)):
y, x = y_valid[vertex_idx], x_valid[vertex_idx]
# Bounds check
if not (0 <= y < road_height and 0 <= x < road_width):
continue
road_value = int(road_data[y, x])
if road_value == 0:
# No road at this location
continue
# Get color for this road type
color_norm = road_colors_map.get(road_value, None)
if color_norm is None:
continue
# Apply colormap
road_color = road_colormap(color_norm)
# Update vertex color (keep alpha)
if vertex_idx < len(terrain.colors):
terrain.colors[vertex_idx, :3] = road_color
colored_count += 1
logger.info(f"✓ Colored {colored_count} vertices on roads")