from __future__ import annotations
import os
import json
import time
from pathlib import Path
import glob
try:
import bpy
except ImportError:
bpy = None
from dataclasses import dataclass
import rasterio
from rasterio.merge import merge
from rasterio.warp import calculate_default_transform, reproject, Resampling
from rasterio import Affine
from scipy.ndimage import zoom, generic_filter, sobel
import numpy as np
from scipy import ndimage
import matplotlib.pyplot as plt
from matplotlib.colors import Normalize
from math import radians
from tqdm import tqdm
from matplotlib.collections import LineCollection
import numpy as np
import shapely
from shapely.geometry import Polygon, Point
from shapely.affinity import scale
import logging
from datetime import datetime
import sys
import seaborn as sns
import geopandas as gpd
from shapely.validation import make_valid
import colorsys
from matplotlib.colors import to_rgb
from typing import Optional, Dict, Any, Tuple, Callable
import functools
import inspect
import zarr
import hashlib
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
# Create a console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
# Create a formatter and add it to the handler
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
console_handler.setFormatter(formatter)
# Add the console handler to the logger
logger.addHandler(console_handler)
def calculate_target_vertices(
width: int,
height: int,
multiplier: float = 2.0,
) -> int:
"""
Calculate target vertex count for optimal mesh density at a given render resolution.
This helper calculates an appropriate number of vertices for terrain meshes
based on the intended output resolution. Using ~2 vertices per output pixel
ensures good detail without excessive geometry.
Args:
width: Render width in pixels
height: Render height in pixels
multiplier: Vertices per pixel (default: 2.0).
Higher values = more detail but slower renders.
- 1.0: Minimum detail (1 vertex per pixel)
- 2.0: Good balance for most renders (recommended)
- 3.0+: High detail for print or zoomed views
Returns:
int: Target vertex count for terrain mesh creation
Example:
```python
# For 1920x1080 render
target = calculate_target_vertices(1920, 1080) # ~4.1M vertices
# For print quality (3000x2400 @ 300 DPI)
target = calculate_target_vertices(3000, 2400, multiplier=2.5) # ~18M vertices
# Use with Terrain
terrain.configure_for_target_vertices(target_vertices=target)
```
"""
return int(width * height * multiplier)
def clear_scene():
"""
Clear all objects from the Blender scene.
Resets the scene to factory settings (empty scene) and removes all default
objects. Useful before importing terrain meshes to ensure a clean workspace.
Raises:
RuntimeError: If Blender module (bpy) is not available.
"""
from src.terrain.scene_setup import clear_scene as _clear_scene
return _clear_scene()
def setup_camera(camera_angle, camera_location, scale, focal_length=50, camera_type="PERSP"):
"""Configure camera for terrain visualization.
Args:
camera_angle: Tuple of (x,y,z) rotation angles in radians
camera_location: Tuple of (x,y,z) camera position
scale: Camera scale value (ortho_scale for orthographic cameras)
focal_length: Camera focal length in mm (default: 50, used only for perspective)
camera_type: Camera type 'PERSP' (perspective) or 'ORTHO' (orthographic) (default: 'PERSP')
Returns:
Camera object
Raises:
ValueError: If camera_type is not 'PERSP' or 'ORTHO'
"""
from src.terrain.scene_setup import setup_camera as _setup_camera
return _setup_camera(camera_angle, camera_location, scale, focal_length, camera_type)
def setup_light(
location=(1, 1, 2),
angle=2,
energy=3,
rotation_euler=None,
azimuth=None,
elevation=None,
):
"""Create and configure sun light for terrain visualization.
Sun position can be specified either with rotation_euler (raw Blender angles)
or with the more intuitive azimuth/elevation system:
- azimuth: Direction the sun comes FROM, in degrees clockwise from North
(0=North, 90=East, 180=South, 270=West)
- elevation: Angle above horizon in degrees (0=horizon, 90=directly overhead)
Args:
location: Tuple of (x,y,z) light position (default: (1, 1, 2))
angle: Angle of sun light in degrees (default: 2)
energy: Energy/intensity of sun light (default: 3)
rotation_euler: Tuple of (x,y,z) rotation angles in radians (default: sun from NW)
azimuth: Direction sun comes FROM in degrees (0=N, 90=E, 180=S, 270=W)
elevation: Sun angle above horizon in degrees (0=horizon, 90=overhead)
Returns:
Sun light object
"""
from src.terrain.scene_setup import setup_light as _setup_light
return _setup_light(location, angle, energy, rotation_euler, azimuth, elevation)
def setup_camera_and_light(
camera_angle,
camera_location,
scale,
sun_angle=2,
sun_energy=3,
focal_length=50,
camera_type="PERSP",
):
"""Configure camera and main light for terrain visualization.
Convenience function that calls setup_camera() and setup_light().
Args:
camera_angle: Tuple of (x,y,z) rotation angles in radians
camera_location: Tuple of (x,y,z) camera position
scale: Camera scale value (ortho_scale for orthographic cameras)
sun_angle: Angle of sun light in degrees (default: 2)
sun_energy: Energy/intensity of sun light (default: 3)
focal_length: Camera focal length in mm (default: 50, used only for perspective)
camera_type: Camera type 'PERSP' (perspective) or 'ORTHO' (orthographic) (default: 'PERSP')
Returns:
tuple: (camera object, sun light object)
"""
from src.terrain.scene_setup import setup_camera_and_light as _setup_camera_and_light
return _setup_camera_and_light(
camera_angle, camera_location, scale, sun_angle, sun_energy, focal_length, camera_type
)
def setup_two_point_lighting(
sun_azimuth: float = 225.0,
sun_elevation: float = 30.0,
sun_energy: float = 7.0,
sun_angle: float = 1.0,
sun_color: tuple = (1.0, 0.85, 0.6),
fill_azimuth: float = 45.0,
fill_elevation: float = 60.0,
fill_energy: float = 0.0,
fill_angle: float = 3.0,
fill_color: tuple = (0.7, 0.8, 1.0),
) -> list:
"""Set up two-point lighting with primary sun and optional fill light.
Creates professional-quality lighting for terrain visualization:
- Primary sun: Creates shadows and defines form (warm color by default)
- Fill light: Softens shadows, adds depth (cool color by default)
Args:
sun_azimuth: Direction sun comes FROM in degrees (0=N, 90=E, 180=S, 270=W)
sun_elevation: Sun angle above horizon in degrees (0=horizon, 90=overhead)
sun_energy: Sun light strength (default: 7.0)
sun_angle: Sun angular size in degrees (default: 1.0)
sun_color: RGB tuple for sun color (default: warm golden)
fill_azimuth: Direction fill light comes FROM in degrees
fill_elevation: Fill light angle above horizon in degrees
fill_energy: Fill light strength (default: 0.0 = no fill)
fill_angle: Fill light angular size in degrees
fill_color: RGB tuple for fill color (default: cool blue)
Returns:
List of created light objects
"""
from src.terrain.scene_setup import setup_two_point_lighting as _setup_two_point_lighting
return _setup_two_point_lighting(
sun_azimuth=sun_azimuth,
sun_elevation=sun_elevation,
sun_energy=sun_energy,
sun_angle=sun_angle,
sun_color=sun_color,
fill_azimuth=fill_azimuth,
fill_elevation=fill_elevation,
fill_energy=fill_energy,
fill_angle=fill_angle,
fill_color=fill_color,
)
def position_camera_relative(
mesh_obj,
direction="south",
distance=1.5,
elevation=0.5,
look_at="center",
camera_type="ORTHO",
sun_angle=0,
sun_energy=0,
sun_azimuth=None,
sun_elevation=None,
focal_length=50,
ortho_scale=1.2,
distance_mode="diagonal",
center_in_frame=True,
):
"""Position camera relative to mesh using intuitive cardinal directions.
Simplifies camera positioning by using natural directions (north, south, etc.)
instead of absolute Blender coordinates. The camera is automatically positioned
relative to the mesh bounds and rotated to point at the mesh center.
Args:
mesh_obj: Blender mesh object to position camera relative to
direction: Compass direction using 16-wind compass rose, one of:
Cardinal: 'north', 'south', 'east', 'west'
Primary intercardinal: 'northeast', 'southeast', 'southwest', 'northwest'
Secondary intercardinal: 'north-northeast', 'east-northeast', 'east-southeast',
'south-southeast', 'south-southwest', 'west-southwest', 'west-northwest',
'north-northwest'
Special: 'above' (directly overhead), 'above-tilted' (overhead but angled)
Default: 'south'
distance: Distance multiplier relative to mesh diagonal
(e.g., 1.5 means 1.5x mesh_diagonal away). Default: 1.5
elevation: Height as fraction of mesh diagonal added to Z position
(0.0 = ground level, 1.0 = mesh_diagonal above ground). Default: 0.5
look_at: Where camera points - 'center' to point at mesh center,
or tuple (x, y, z) for custom target. Default: 'center'
camera_type: 'ORTHO' (orthographic) or 'PERSP' (perspective). Default: 'ORTHO'
sun_angle: Angle of sun light in degrees. Default: 0 (no light)
sun_energy: Intensity of sun light. Default: 0 (no light created unless > 0)
sun_azimuth: Direction sun comes FROM in degrees (0=N, 90=E, 180=S, 270=W).
sun_elevation: Sun angle above horizon in degrees (0=horizon, 90=overhead).
focal_length: Camera focal length in mm (perspective cameras only). Default: 50
ortho_scale: Multiplier for orthographic camera scale relative to mesh diagonal.
Higher values zoom out (show more area), lower values zoom in.
Only affects orthographic cameras. Default: 1.2
distance_mode: How to interpret distance parameter ('diagonal' or 'fit'). Default: 'diagonal'
center_in_frame: If True, adjust look_at target to center the mesh's 2D projection
in the camera frame, not just point at the 3D geometric center. Default: True
Returns:
Camera object
Raises:
ValueError: If direction is not recognized or camera_type is invalid
"""
from src.terrain.scene_setup import position_camera_relative as _position_camera_relative
return _position_camera_relative(
mesh_obj=mesh_obj,
direction=direction,
distance=distance,
elevation=elevation,
look_at=look_at,
camera_type=camera_type,
sun_angle=sun_angle,
sun_energy=sun_energy,
sun_azimuth=sun_azimuth,
sun_elevation=sun_elevation,
focal_length=focal_length,
ortho_scale=ortho_scale,
distance_mode=distance_mode,
center_in_frame=center_in_frame,
)
def setup_world_atmosphere(density=0.02, scatter_color=(1, 1, 1, 1), anisotropy=0.0):
"""Set up world volume for atmospheric effects.
Args:
density: Density of the atmospheric volume (default: 0.02)
scatter_color: RGBA color tuple for scatter (default: white)
anisotropy: Direction of scatter from -1 to 1 (default: 0 for uniform)
Returns:
bpy.types.World: The configured world object
"""
from src.terrain.scene_setup import setup_world_atmosphere as _setup_world_atmosphere
return _setup_world_atmosphere(density, scatter_color, anisotropy)
def setup_hdri_lighting(
sun_elevation: float = 30.0,
sun_rotation: float = 225.0,
sun_intensity: float = 1.0,
sun_size: float = 0.545,
air_density: float = 1.0,
visible_to_camera: bool = False,
camera_background: tuple = None,
sky_strength: float = None,
):
"""Set up HDRI-style sky lighting using Blender's Nishita sky model.
Creates realistic sky lighting that contributes to ambient illumination
without being visible in the final render (by default).
Args:
sun_elevation: Sun elevation angle in degrees (0=horizon, 90=overhead)
sun_rotation: Sun rotation/azimuth in degrees (0=front, 180=back)
sun_intensity: Multiplier for sun disc brightness in sky texture (default: 1.0)
sun_size: Angular diameter of sun disc in degrees (default: 0.545 = real sun).
Larger values create softer shadows, smaller values create sharper.
air_density: Atmospheric density (default: 1.0, higher=hazier)
visible_to_camera: If False, sky is invisible but still lights scene
camera_background: RGB tuple for background when sky invisible.
Use (0.9, 0.9, 0.9) for atmosphere to work.
sky_strength: Overall sky emission strength (ambient light level).
If None, defaults to sun_intensity for backwards compatibility.
Returns:
bpy.types.World: The configured world object
"""
from src.terrain.scene_setup import setup_hdri_lighting as _setup_hdri_lighting
return _setup_hdri_lighting(
sun_elevation=sun_elevation,
sun_rotation=sun_rotation,
sun_intensity=sun_intensity,
sun_size=sun_size,
air_density=air_density,
visible_to_camera=visible_to_camera,
camera_background=camera_background,
sky_strength=sky_strength,
)
def setup_render_settings(
use_gpu: bool = True,
samples: int = 128,
preview_samples: int = 32,
use_denoising: bool = True,
denoiser: str = "OPTIX",
compute_device: str = "OPTIX",
use_persistent_data: bool = False,
use_auto_tile: bool = False,
tile_size: int = 2048,
) -> None:
"""
Configure Blender render settings for high-quality terrain visualization.
Args:
use_gpu: Whether to use GPU acceleration
samples: Number of render samples
preview_samples: Number of viewport preview samples
use_denoising: Whether to enable denoising
denoiser: Type of denoiser to use ('OPTIX', 'OPENIMAGEDENOISE', 'NLM')
compute_device: Compute device type ('OPTIX', 'CUDA', 'HIP', 'METAL')
use_persistent_data: Keep scene data in memory between frames (default: False)
use_auto_tile: Enable automatic tiling for large renders (default: False).
Splits large images into smaller GPU-friendly tiles to reduce VRAM usage.
Essential for print-quality renders (3000x2400+ pixels).
tile_size: Tile size in pixels when auto_tile is enabled (default: 2048).
Smaller tiles = less VRAM but slower rendering. Try 512-1024 for limited VRAM.
"""
from src.terrain.rendering import setup_render_settings as _setup_render_settings
return _setup_render_settings(
use_gpu=use_gpu,
samples=samples,
preview_samples=preview_samples,
use_denoising=use_denoising,
denoiser=denoiser,
compute_device=compute_device,
use_persistent_data=use_persistent_data,
use_auto_tile=use_auto_tile,
tile_size=tile_size,
)
def render_scene_to_file(
output_path,
width=1920,
height=1440,
file_format="PNG",
color_mode="RGBA",
compression=90,
save_blend_file=True,
show_progress=True,
max_retries=3,
retry_delay=5.0,
):
"""
Render the current Blender scene to file.
Includes automatic retry logic for GPU memory errors. If rendering fails
due to CUDA/GPU memory exhaustion, the function will wait and retry up to
max_retries times before giving up.
Args:
output_path (str or Path): Path where output file will be saved
width (int): Render width in pixels (default: 1920)
height (int): Render height in pixels (default: 1440)
file_format (str): Output format 'PNG', 'JPEG', etc. (default: 'PNG')
color_mode (str): 'RGBA' or 'RGB' (default: 'RGBA')
compression (int): PNG compression level 0-100 (default: 90)
save_blend_file (bool): Also save .blend project file (default: True)
show_progress (bool): Show render progress updates (default: True).
Logs elapsed time every 5 seconds during rendering.
max_retries (int): Maximum number of retry attempts for GPU memory
errors (default: 3). Set to 0 to disable retries.
retry_delay (float): Seconds to wait between retry attempts (default: 5.0).
Allows GPU memory to be freed by other processes.
Returns:
Path: Path to rendered file if successful, None otherwise
"""
from src.terrain.rendering import render_scene_to_file as _render_scene_to_file
return _render_scene_to_file(
output_path=output_path,
width=width,
height=height,
file_format=file_format,
color_mode=color_mode,
compression=compression,
save_blend_file=save_blend_file,
show_progress=show_progress,
max_retries=max_retries,
retry_delay=retry_delay,
)
def get_render_settings_report() -> dict:
"""
Query Blender for the actual render settings used.
Returns a dictionary of all render-relevant settings, useful for
debugging, reproducibility, and verification.
Returns:
dict: Dictionary containing all render settings from Blender
"""
from src.terrain.rendering import get_render_settings_report as _get_render_settings_report
return _get_render_settings_report()
def print_render_settings_report(log=None) -> None:
"""
Print a formatted report of all Blender render settings.
Queries Blender for actual settings and prints them in a readable format.
Useful for debugging and ensuring settings are correctly applied.
Args:
log: Logger to use (defaults to module logger)
"""
from src.terrain.rendering import print_render_settings_report as _print_render_settings_report
return _print_render_settings_report(log)
def apply_colormap_material(material: bpy.types.Material) -> None:
"""
Create a simple material setup for terrain visualization using vertex colors.
Uses emission to guarantee colors are visible regardless of lighting.
Args:
material: Blender material to configure
"""
from src.terrain.materials import apply_colormap_material as _apply_colormap_material
return _apply_colormap_material(material)
def apply_water_shader(
material: bpy.types.Material, water_color: Tuple[float, float, float] = (0.0, 0.153, 0.298)
) -> None:
"""
Apply water shader to material, coloring water areas based on vertex alpha channel.
Uses alpha channel to mix between water color and elevation colors.
Water pixels (alpha=1.0) render as water color; land pixels (alpha=0.0) show elevation colors.
Args:
material: Blender material to configure
water_color: RGB tuple for water (default: University of Michigan blue #00274C)
"""
from src.terrain.materials import apply_water_shader as _apply_water_shader
return _apply_water_shader(material, water_color)
def create_background_plane(
terrain_obj: bpy.types.Object,
depth: float = -2.0,
scale_factor: float = 2.0,
material_params: Optional[Dict] = None,
) -> bpy.types.Object:
"""
Create a large emissive plane beneath the terrain for background illumination.
Args:
terrain_obj: The terrain Blender object used for size reference
depth: Z-coordinate for the plane position
scale_factor: Scale multiplier for plane size relative to terrain
material_params: Optional dict to override default material parameters
Returns:
bpy.types.Object: The created background plane object
Raises:
ValueError: If terrain_obj is None or has invalid bounds
RuntimeError: If mesh or material creation fails
"""
from src.terrain.materials import create_background_plane as _create_background_plane
return _create_background_plane(terrain_obj, depth, scale_factor, material_params)
[docs]
def load_dem_files(
directory_path: str, pattern: str = "*.hgt", recursive: bool = False
) -> tuple[np.ndarray, rasterio.Affine]:
"""
Load and merge DEM files from a directory into a single elevation dataset.
Supports any raster format readable by rasterio (HGT, GeoTIFF, etc.).
Args:
directory_path: Path to directory containing DEM files
pattern: File pattern to match (default: ``*.hgt``)
recursive: Whether to search subdirectories recursively (default: False)
Returns:
tuple: (merged_dem, transform) where:
- merged_dem: numpy array containing the merged elevation data
- transform: affine transform mapping pixel to geographic coordinates
Raises:
ValueError: If no valid DEM files are found or directory doesn't exist
OSError: If directory access fails or file reading fails
rasterio.errors.RasterioIOError: If there are issues reading the DEM files
"""
from src.terrain.data_loading import load_dem_files as _load_dem_files
return _load_dem_files(directory_path, pattern, recursive)
def load_filtered_hgt_files(
dem_dir,
min_latitude: int = None,
max_latitude: int = None,
min_longitude: int = None,
max_longitude: int = None,
bbox: tuple = None,
pattern: str = "*.hgt",
):
"""
Load SRTM HGT files filtered by latitude/longitude range or bounding box.
Filters files before loading to reduce memory usage for large DEM directories.
Args:
dem_dir: Directory containing HGT files
min_latitude: Southern bound (e.g., 41 for N41)
max_latitude: Northern bound (e.g., 47 for N47)
min_longitude: Western bound (e.g., -90 for W090)
max_longitude: Eastern bound (e.g., -82 for W082)
bbox: Bounding box as (west, south, east, north). Overrides individual params.
pattern: File pattern (default: "*.hgt")
Returns:
Tuple of (merged_dem, transform)
Example:
>>> # Load Detroit area using bbox
>>> dem, transform = load_filtered_hgt_files(
... "data/dem/detroit",
... bbox=(-84, 41, -82, 43)
... )
"""
from src.terrain.data_loading import load_filtered_hgt_files as _load_filtered_hgt_files
return _load_filtered_hgt_files(
dem_dir,
min_latitude=min_latitude,
max_latitude=max_latitude,
min_longitude=min_longitude,
max_longitude=max_longitude,
bbox=bbox,
pattern=pattern,
)
def reproject_raster(src_crs="EPSG:4326", dst_crs="EPSG:32617", nodata_value=np.nan, num_threads=4):
"""
Generalized raster reprojection function
Args:
src_crs: Source coordinate reference system
dst_crs: Destination coordinate reference system
nodata_value: Value to use for areas outside original data
num_threads: Number of threads for parallel processing
Returns:
Function that transforms data and returns (data, transform, new_crs)
"""
from src.terrain.transforms import reproject_raster as _reproject_raster
return _reproject_raster(src_crs, dst_crs, nodata_value, num_threads)
def downsample_raster(zoom_factor=0.1, method="average", nodata_value=np.nan, optimized=True):
"""
Create a raster downsampling transform function with specified parameters.
For large compression ratios (>100:1), automatically uses two-pass downsampling
for improved performance (expected ~35x speedup on billion-pixel DEMs).
Args:
zoom_factor: Scaling factor for downsampling (default: 0.1)
method: Downsampling method (default: "average")
- "average": Area averaging - best for DEMs, no overshoot
- "lanczos": Lanczos resampling - sharp, minimal aliasing
- "cubic": Cubic spline interpolation
- "bilinear": Bilinear interpolation - safe fallback
nodata_value: Value to treat as no data (default: np.nan)
optimized: Use two-pass optimization for large compressions (default: True)
Returns:
function: A transform function that downsamples raster data
"""
from src.terrain.transforms import (
downsample_raster as _downsample_raster,
downsample_raster_optimized as _downsample_raster_optimized,
)
if optimized:
return _downsample_raster_optimized(zoom_factor, method, nodata_value)
else:
return _downsample_raster(zoom_factor, method, nodata_value)
def smooth_raster(window_size=None, nodata_value=np.nan):
"""
Create a raster smoothing transform function with specified parameters.
Args:
window_size: Size of median filter window
(defaults to 5% of smallest dimension if None)
nodata_value: Value to treat as no data (default: np.nan)
Returns:
function: A transform function that smooths raster data
"""
from src.terrain.transforms import smooth_raster as _smooth_raster
return _smooth_raster(window_size, nodata_value)
def flip_raster(axis="horizontal"):
"""
Create a transform function that mirrors (flips) the DEM data.
If axis='horizontal', it flips top ↔ bottom.
(In terms of rows, row=0 becomes row=(height-1).)
If axis='vertical', you could do left ↔ right (np.fliplr).
"""
from src.terrain.transforms import flip_raster as _flip_raster
return _flip_raster(axis)
def scale_elevation(scale_factor=1.0, nodata_value=np.nan):
"""
Create a raster elevation scaling transform function.
Multiplies all elevation values by the scale factor. Useful for reducing
or amplifying terrain height without changing horizontal scale.
Args:
scale_factor (float): Multiplication factor for elevation values (default: 1.0)
nodata_value: Value to treat as no data (default: np.nan)
Returns:
function: A transform function that scales elevation data
"""
from src.terrain.transforms import scale_elevation as _scale_elevation
return _scale_elevation(scale_factor, nodata_value)
def feature_preserving_smooth(sigma_spatial=3.0, sigma_intensity=None, nodata_value=np.nan):
"""
Create a feature-preserving smoothing transform using bilateral filtering.
Removes high-frequency noise while preserving ridges, valleys, and drainage patterns.
Uses bilateral filtering: spatial Gaussian weighted by intensity similarity.
Args:
sigma_spatial: Spatial smoothing extent in pixels (default: 3.0).
Larger = more smoothing. Typical range: 1-10 pixels.
sigma_intensity: Intensity similarity threshold in elevation units.
Larger = more smoothing across elevation differences.
If None, auto-calculated as 5% of elevation range.
nodata_value: Value to treat as no data (default: np.nan)
Returns:
function: Transform function compatible with terrain.add_transform()
"""
from src.terrain.transforms import feature_preserving_smooth as _feature_preserving_smooth
return _feature_preserving_smooth(sigma_spatial, sigma_intensity, nodata_value)
def smooth_score_data(scores, sigma_spatial=3.0, sigma_intensity=None):
"""
Smooth score data using bilateral filtering.
Applies feature-preserving smoothing to reduce blocky pixelation from
low-resolution source data (e.g., SNODAS ~925m) when displayed on
high-resolution terrain (~30m DEM).
Uses bilateral filtering: smooths within similar-intensity regions while
preserving edges between different score zones.
Args:
scores: 2D numpy array of score values (typically 0-1 range)
sigma_spatial: Spatial smoothing extent in pixels (default: 3.0).
Larger = more smoothing. Typical range: 1-10 pixels.
sigma_intensity: Intensity similarity threshold in score units.
Larger = more smoothing across score differences.
If None, auto-calculated as 15% of score range (good for 0-1 data).
Returns:
Smoothed score array with same shape as input.
NaN values are preserved. Output is clipped to [0, 1] range.
Example:
>>> # Smooth blocky SNODAS-derived scores
>>> sledding_scores = load_score_grid("sledding_scores.npz")
>>> smoothed = smooth_score_data(sledding_scores, sigma_spatial=5.0)
"""
from src.terrain.transforms import smooth_score_data as _smooth_score_data
return _smooth_score_data(scores, sigma_spatial, sigma_intensity)
def slope_colormap(slopes, cmap_name="terrain", min_slope=0, max_slope=45):
"""
Create a simple colormap based solely on terrain slopes.
Args:
slopes: Array of slope values in degrees
cmap_name: Matplotlib colormap name (default: 'terrain')
min_slope: Minimum slope value for normalization (default: 0)
max_slope: Maximum slope value for normalization (default: 45)
Returns:
Array of RGBA colors with shape (*slopes.shape, 4)
"""
from src.terrain.color_mapping import slope_colormap as _slope_colormap
return _slope_colormap(slopes, cmap_name, min_slope, max_slope)
def elevation_colormap(dem_data, cmap_name="viridis", min_elev=None, max_elev=None, gamma=1.0):
"""
Create a colormap based on elevation values.
Maps elevation data to colors using a matplotlib colormap.
Low elevations map to the start of the colormap, high elevations to the end.
Args:
dem_data: 2D numpy array of elevation values
cmap_name: Matplotlib colormap name (default: 'viridis')
min_elev: Minimum elevation for normalization (default: use data min)
max_elev: Maximum elevation for normalization (default: use data max)
gamma: Gamma correction exponent (default: 1.0 = no correction).
Values < 1.0 brighten midtones, values > 1.0 darken midtones.
Returns:
Array of RGB colors with shape (height, width, 3) as uint8
"""
from src.terrain.color_mapping import elevation_colormap as _elevation_colormap
return _elevation_colormap(dem_data, cmap_name, min_elev, max_elev, gamma)
def transform_wrapper(transform_func):
"""
Standardize transform function interface with consistent output
Args:
transform_func: The original transform function to wrap
Returns:
A wrapped function with consistent signature and return format
"""
@functools.wraps(transform_func)
def wrapped_transform(data: np.ndarray, transform: rasterio.Affine = None) -> tuple:
"""
Standardized transform wrapper with consistent signature
Args:
data: Input numpy array to transform
transform: Optional affine transform
Returns:
Tuple of (transformed_data, transform, [crs]) where CRS is optional
"""
# Inspect function signature to determine how to call
sig = inspect.signature(transform_func)
params = list(sig.parameters.keys())
try:
# Initialize result variables
transformed_data = None
final_transform = transform
crs = None
# Case 1: Transform takes only data
if len(params) == 1 and params[0] == "data":
transformed_data = transform_func(data)
# Case 2: Transform takes (data, transform)
elif len(params) == 2 and params[0] == "data" and params[1] == "transform":
result = transform_func(data, transform)
# Handle different return types
if isinstance(result, tuple):
if len(result) == 3:
# When transform returns (data, transform, crs)
transformed_data, final_transform, crs = result
elif len(result) == 2:
# When transform returns (data, transform)
transformed_data, final_transform = result
else:
transformed_data = result[0]
else:
transformed_data = result
# Case 3: More complex signature or other parameters
else:
result = transform_func(data, transform)
# Handle different return types
if isinstance(result, tuple):
if len(result) == 3:
# When transform returns (data, transform, crs)
transformed_data, final_transform, crs = result
elif len(result) == 2:
# When transform returns (data, transform)
transformed_data, final_transform = result
else:
transformed_data = result[0]
else:
transformed_data = result
# Return standardized format: always a 3-tuple with optional None for crs
return transformed_data, final_transform, crs
except Exception as e:
logger = logging.getLogger(__name__)
logger.error(f"Error in transform {transform_func.__name__}: {e}")
raise
return wrapped_transform
class TerrainCache:
"""
Cache manager for terrain data processing results.
Handles persistent storage and retrieval of transformed terrain data layers
as GeoTIFF files with geographic metadata. Supports loading and saving with
coordinate reference system (CRS) and custom metadata.
Attributes:
cache_dir (Path): Root directory for cached GeoTIFF files.
logger (logging.Logger): Logger instance for cache operations.
Examples:
>>> cache = TerrainCache('my_cache_dir')
>>> cache.save('dem_transformed', dem_array, transform, 'EPSG:32617')
>>> data, transform, crs = cache.load('dem_transformed')
"""
def __init__(self, cache_dir: str = "terrain_cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(parents=True, exist_ok=True)
self.logger = logging.getLogger(__name__)
def get_target_path(self, target_name: str) -> Path:
"""Get path for a specific target"""
return self.cache_dir / f"{target_name}.tif"
def exists(self, target_name: str) -> bool:
"""Check if target exists"""
return self.get_target_path(target_name).exists()
def save(self, target_name: str, data: np.ndarray, transform, crs="EPSG:4326", metadata=None):
"""Save data as GeoTIFF with CRS and metadata"""
path = self.get_target_path(target_name)
self.logger.info(f"Saving {target_name}")
# Save main raster data
with rasterio.open(
path,
"w",
driver="GTiff",
height=data.shape[0],
width=data.shape[1],
count=1,
dtype=data.dtype,
crs=crs,
transform=transform,
compress="lzw",
) as dst:
dst.write(data, 1)
# Save metadata if provided
if metadata:
# Convert any non-string values to strings for GDAL metadata
string_metadata = {k: str(v) for k, v in metadata.items()}
dst.update_tags(**string_metadata)
# For more complex metadata that can't be stored in GDAL tags
if metadata:
# Save to a companion JSON file
meta_path = path.with_suffix(".json")
with open(meta_path, "w") as f:
json.dump(metadata, f)
def load(self, target_name: str) -> Optional[Dict[str, Any]]:
"""Load GeoTIFF and metadata if it exists"""
path = self.get_target_path(target_name)
if not path.exists():
return None
self.logger.info(f"Loading {target_name}")
# Load raster data
with rasterio.open(path) as src:
data = src.read(1)
transform = src.transform
crs = src.crs
# Get basic metadata from tags
basic_metadata = src.tags()
# Try to load companion metadata file
meta_path = path.with_suffix(".json")
full_metadata = basic_metadata.copy()
if meta_path.exists():
try:
with open(meta_path) as f:
full_metadata.update(json.load(f))
except (json.JSONDecodeError, OSError) as e:
self.logger.warning(f"Failed to load metadata file {meta_path}: {e}")
return {"data": data, "transform": transform, "crs": crs, "metadata": full_metadata}
[docs]
class Terrain:
"""
Core class for managing Digital Elevation Model (DEM) data and terrain operations.
Handles loading, transforming, and visualizing terrain data from raster sources.
Supports coordinate reprojection, downsampling, color mapping, and 3D mesh generation
for Blender visualization. Uses efficient caching to avoid recomputation of transforms.
Attributes:
dem_shape (tuple): Shape of DEM array as (height, width).
dem_transform (rasterio.Affine): Affine transform for geographic coordinates.
data_layers (dict): Dictionary of data layers (DEM, overlays, derived data).
transforms (list): List of transform functions to apply.
vertices (np.ndarray): Vertex positions for generated mesh.
vertex_colors (np.ndarray): RGBA colors for mesh vertices.
Examples:
>>> dem_data = np.random.rand(100, 100) * 1000
>>> transform = rasterio.Affine.identity()
>>> terrain = Terrain(dem_data, transform, dem_crs='EPSG:4326')
>>> terrain.apply_transforms()
>>> mesh = terrain.create_mesh(scale_factor=100.0)
"""
[docs]
def __init__(
self,
dem_data: np.ndarray,
dem_transform: rasterio.Affine,
dem_crs: str = "EPSG:4326",
cache_dir: str = "terrain_cache",
logger: Optional[logging.Logger] = None,
) -> None:
"""
Initialize terrain from DEM data.
Args:
dem_data (np.ndarray): DEM array of shape (height, width) containing elevation values.
Integer types are converted to float32. Must be 2D.
dem_transform (rasterio.Affine): Affine transform mapping pixel coordinates to
geographic coordinates.
dem_crs (str): Coordinate reference system in EPSG format (default: 'EPSG:4326').
cache_dir (str): Directory for caching computations (default: 'terrain_cache').
logger (logging.Logger, optional): Logger instance for diagnostic output.
Raises:
TypeError: If dem_data is not a numpy array or has unsupported dtype.
ValueError: If dem_data is not 2D.
"""
self.logger = logger or logging.getLogger(__name__)
self.logger.info("Initializing Terrain Cache")
self.cache = TerrainCache(cache_dir)
self.logger.info(f"Cache directory contents: {list(self.cache.cache_dir.glob('**/*'))}")
self.logger.info("Initializing Terrain...")
# Validate input
if not isinstance(dem_data, np.ndarray):
raise TypeError("dem_data must be a numpy array")
if dem_data.ndim != 2:
raise ValueError(f"dem_data must be 2D, got shape {dem_data.shape}")
# Convert to float32 if needed
if np.issubdtype(dem_data.dtype, np.integer):
self.logger.info(f"Converting DEM data from {dem_data.dtype} to float32")
dem_data = dem_data.astype(np.float32)
elif not np.issubdtype(dem_data.dtype, np.floating):
raise TypeError(f"Unsupported DEM data type: {dem_data.dtype}")
# Store original DEM data and transform
self.dem_transform = dem_transform
self.dem_bounds = rasterio.transform.array_bounds(
dem_data.shape[0], dem_data.shape[1], dem_transform
)
# Calculate resolution in meters
self.resolution = (abs(dem_transform[0]) * 111320, abs(dem_transform[4]) * 111320)
self.dem_shape = dem_data.shape
# Initialize list of transforms and data layers
self.transforms = []
self.data_layers = {}
# Track cumulative transform metadata (e.g., elevation scale factors)
# This allows algorithms to compensate for prior transforms
self.transform_metadata = {
"elevation_scale": 1.0, # Cumulative scale factor applied to elevation
}
# Initialize containers for processed data
self.processed_dem = None # Will hold transformed DEM data
self.vertices = None # Will hold final vertex positions
self.faces = None # Will hold face indices
self.vertex_colors = None # Will hold vertex colors
self.add_data_layer("dem", dem_data, dem_transform, dem_crs)
self.logger.info(f"Terrain initialized with DEM data:")
self.logger.info(f" Shape: {dem_data.shape}")
self.logger.info(f" Resolution: {self.resolution[0]:.2f}m x {self.resolution[1]:.2f}m")
self.logger.info(f" Value range: {np.nanmin(dem_data):.2f} to {np.nanmax(dem_data):.2f}")
[docs]
def visualize_dem(
self,
layer: str = "dem",
use_transformed: bool = False,
title: str = None,
cmap: str = "terrain",
percentile_clip: bool = True,
clip_percentiles: tuple = (1, 99),
max_pixels: int = 500_000,
show_histogram: bool = True,
) -> None:
"""
Create diagnostic visualization of any terrain data layer.
Args:
layer: Name of data layer to visualize (default: 'dem')
use_transformed: Whether to use transformed or original data (default: False)
title: Plot title (default: auto-generated based on layer)
cmap: Matplotlib colormap
percentile_clip: Whether to clip extreme values
clip_percentiles: Tuple of (min, max) percentiles to clip (default: (1, 99))
max_pixels: Maximum number of pixels for subsampling
show_histogram: Whether to show the histogram panel (default: True)
"""
self.logger.info(f"Creating visualization for layer '{layer}'")
# Validate requested layer exists
if layer not in self.data_layers:
available_layers = list(self.data_layers.keys())
raise ValueError(f"Layer '{layer}' not found. Available layers: {available_layers}")
layer_info = self.data_layers[layer]
# Determine which data to use (transformed or original)
if use_transformed and not layer_info.get("transformed", False):
self.logger.warning(
f"Transformed data requested for layer '{layer}' but not available. Using original."
)
use_transformed = False
if use_transformed:
plot_data = layer_info["transformed_data"]
data_transform = layer_info["transformed_transform"]
data_crs = layer_info["transformed_crs"]
self.logger.info(f"Using transformed data for layer '{layer}'")
else:
plot_data = layer_info["data"]
data_transform = layer_info["transform"]
data_crs = layer_info["crs"]
self.logger.info(f"Using original data for layer '{layer}'")
# Generate title if not provided
if title is None:
transform_status = "Transformed" if use_transformed else "Original"
title = f"{transform_status} {layer.capitalize()} Layer Visualization"
# Remove NaN for calculations
valid_data = plot_data[~np.isnan(plot_data)]
if len(valid_data) == 0:
self.logger.error(f"Layer '{layer}' contains no valid data (all NaN)")
return
# Logging basic statistics
self.logger.info("Data Statistics:")
self.logger.info(f" Shape: {plot_data.shape}")
self.logger.info(f" Min Value: {valid_data.min():.4f}")
self.logger.info(f" Max Value: {valid_data.max():.4f}")
self.logger.info(f" Mean Value: {valid_data.mean():.4f}")
self.logger.info(f" Median Value: {np.median(valid_data):.4f}")
# NaN analysis
nan_percentage = np.isnan(plot_data).mean() * 100
self.logger.info(f" NaN Percentage: {nan_percentage:.2f}%")
# Subsampling to prevent memory issues
def sample_array(arr):
"""Downsample array for visualization if it exceeds max_pixels limit."""
total_pixels = arr.size
if total_pixels <= max_pixels:
return arr
sample_rate = int(np.sqrt(total_pixels / max_pixels))
self.logger.info(f"Subsampling with rate 1/{sample_rate} for visualization")
return arr[::sample_rate, ::sample_rate]
sampled_data = sample_array(plot_data)
# Determine color scaling
if percentile_clip:
min_percentile, max_percentile = clip_percentiles
vmin = np.percentile(valid_data, min_percentile)
vmax = np.percentile(valid_data, max_percentile)
self.logger.info(
f"Clipping to {min_percentile}-{max_percentile} percentiles: [{vmin:.4f}, {vmax:.4f}]"
)
else:
vmin, vmax = valid_data.min(), valid_data.max()
# Determine plot layout
if show_histogram:
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 8))
fig.suptitle(title, fontsize=16)
# Main data heatmap
im = ax1.imshow(sampled_data, cmap=cmap, vmin=vmin, vmax=vmax, interpolation="nearest")
ax1.set_title(f"{layer.capitalize()} Visualization")
ax1.set_xlabel("Column Index")
ax1.set_ylabel("Row Index")
plt.colorbar(im, ax=ax1, shrink=0.8, label=layer.capitalize())
# Value distribution histogram
ax2.hist(valid_data, bins=50, color="skyblue", alpha=0.7)
ax2.set_title(f"{layer.capitalize()} Distribution")
ax2.set_xlabel("Value")
ax2.set_ylabel("Frequency")
# Add grid lines
ax1.grid(False)
ax2.grid(True, alpha=0.3)
else:
# Simple single plot with just the heatmap
fig, ax = plt.subplots(figsize=(10, 8))
fig.suptitle(title, fontsize=16)
im = ax.imshow(sampled_data, cmap=cmap, vmin=vmin, vmax=vmax, interpolation="nearest")
ax.set_title(f"{layer.capitalize()} Visualization")
ax.set_xlabel("Column Index")
ax.set_ylabel("Row Index")
plt.colorbar(im, ax=ax, shrink=0.8, label=layer.capitalize())
# Add data source and transform info in footer
transform_str = f"Transform: [{data_transform[0]:.6f}, {data_transform[1]:.6f}, {data_transform[2]:.6f}, {data_transform[3]:.6f}, {data_transform[4]:.6f}, {data_transform[5]:.6f}]"
plt.figtext(0.5, 0.01, f"CRS: {data_crs} | {transform_str}", ha="center", fontsize=8)
plt.tight_layout()
plt.subplots_adjust(bottom=0.05) # Make room for footer
plt.show()
self.logger.info("Visualization complete.")
[docs]
def add_data_layer(
self,
name: str,
data: np.ndarray,
transform: Optional[rasterio.Affine] = None,
crs: Optional[str] = None,
target_crs: Optional[str] = None,
target_layer: Optional[str] = None,
same_extent_as: Optional[str] = None,
resampling: Resampling = Resampling.bilinear,
nodata: Optional[float] = None,
) -> None:
"""
Add a data layer, optionally reprojecting to match another layer.
Stores data with geographic metadata (CRS and transform). Can automatically
reproject and resample to match an existing layer's grid for multi-layer analysis.
Args:
name (str): Unique name for this data layer (e.g., 'dem', 'elevation', 'slope').
data (np.ndarray): 2D array of data values, shape (height, width).
transform (rasterio.Affine, optional): Affine transform mapping pixel to geographic
coords. Required unless same_extent_as is specified.
crs (str, optional): Coordinate reference system in EPSG format (e.g., 'EPSG:4326').
Required unless same_extent_as is specified (inherits from reference layer).
target_crs (str, optional): Target CRS to reproject to. If None and target_layer
specified, uses target layer's CRS. If None and no target, uses input crs.
target_layer (str, optional): Name of existing layer to match grid and CRS.
If specified, data is automatically reprojected and resampled to align.
same_extent_as (str, optional): Name of existing layer whose geographic extent
this data covers. When specified, transform and CRS are automatically
calculated from the reference layer's bounds and the data's shape.
This is useful when score grids or overlays cover the same area as the
DEM but at different resolutions. Implies target_layer if not specified.
resampling (rasterio.enums.Resampling): Resampling method for reprojection
(default: Resampling.bilinear). See rasterio docs for options.
nodata (float, optional): No-data sentinel value used during reprojection.
Source pixels with this value are treated as missing and won't influence
bilinear interpolation neighbours. Destination pixels with no source
coverage are filled with this value. Defaults to ``np.nan`` for
floating-point arrays and ``0`` for integer arrays.
Returns:
None: Modifies internal data_layers dictionary.
Raises:
KeyError: If target_layer or same_extent_as layer doesn't exist.
ValueError: If neither transform nor same_extent_as is provided.
Examples:
>>> # Add elevation data with native CRS
>>> terrain.add_data_layer('dem', dem_array, transform, 'EPSG:4326')
>>> # Add overlay data, reproject to match DEM
>>> terrain.add_data_layer('landcover', lc_array, lc_transform, 'EPSG:3857',
... target_layer='dem')
>>> # Add score data that covers the same extent as DEM (automatic transform)
>>> terrain.add_data_layer('score', score_array, same_extent_as='dem')
>>> # Use nearest-neighbor for categorical data
>>> terrain.add_data_layer('zones', zone_array, zone_transform, 'EPSG:4326',
... target_layer='dem', resampling=Resampling.nearest)
"""
self.logger.info(f"Adding data layer '{name}'")
# Handle same_extent_as: calculate transform from reference layer's bounds
if same_extent_as is not None:
if same_extent_as not in self.data_layers:
raise KeyError(f"Reference layer '{same_extent_as}' not found for same_extent_as")
ref_info = self.data_layers[same_extent_as]
# Use ORIGINAL extent and CRS (before transforms) since the source data
# typically covers the same geographic area as the original reference layer.
# The reprojection will handle coordinate transformation.
ref_data = ref_info["data"]
ref_transform = ref_info["transform"]
ref_crs = ref_info["crs"]
# Calculate geographic bounds of reference layer
ref_height, ref_width = ref_data.shape
# Top-left corner
x_origin = ref_transform.c
y_origin = ref_transform.f
# Bottom-right corner
x_end = x_origin + ref_transform.a * ref_width
y_end = y_origin + ref_transform.e * ref_height
# Calculate pixel size for source data to cover same extent
src_height, src_width = data.shape
pixel_width = (x_end - x_origin) / src_width
pixel_height = (y_end - y_origin) / src_height
# Create transform for source data
transform = rasterio.Affine(pixel_width, 0, x_origin, 0, pixel_height, y_origin)
crs = ref_crs
self.logger.info(
f"Calculated transform from '{same_extent_as}' extent: "
f"origin=({x_origin:.4f}, {y_origin:.4f}), "
f"pixel=({pixel_width:.6f}, {pixel_height:.6f})"
)
# If target_layer not specified, use same_extent_as as target
if target_layer is None:
target_layer = same_extent_as
# Validate that transform and crs are provided (either directly or via same_extent_as)
if transform is None:
raise ValueError("transform is required (or use same_extent_as to calculate automatically)")
if crs is None:
raise ValueError("crs is required (or use same_extent_as to inherit from reference layer)")
# Store target_layer reference for post-transform alignment
target_layer_ref = target_layer
# Determine target CRS and transform
if target_layer is not None:
if target_layer not in self.data_layers:
raise KeyError(f"Target layer '{target_layer}' not found")
target_info = self.data_layers[target_layer]
# Use transformed data dimensions if transforms have been applied,
# otherwise fall back to original dimensions. This ensures data layers
# added after downsampling are automatically resampled to match the
# actual mesh dimensions.
if target_info.get("transformed", False) and "transformed_data" in target_info:
target_shape = target_info["transformed_data"].shape
target_transform = target_info.get(
"transformed_transform", target_info["transform"]
)
target_crs = target_info.get("transformed_crs", target_info["crs"])
self.logger.debug(
f"Using transformed target shape {target_shape} for layer alignment"
)
else:
target_shape = target_info["data"].shape
target_transform = target_info["transform"]
target_crs = target_info["crs"]
elif target_crs is not None:
# If target_crs provided but no reference layer, we need a reference layer
if not self.data_layers:
raise ValueError("Cannot determine target grid without reference layer")
# Use first layer as reference for grid
reference_layer = next(iter(self.data_layers.values()))
# Use transformed dimensions if available
if reference_layer.get("transformed", False) and "transformed_data" in reference_layer:
target_transform = reference_layer.get(
"transformed_transform", reference_layer["transform"]
)
target_shape = reference_layer["transformed_data"].shape
else:
target_transform = reference_layer["transform"]
target_shape = reference_layer["data"].shape
else:
# If no target specified, keep original
self.data_layers[name] = {
"data": data,
"transform": transform,
"crs": crs,
"transformed": False,
"target_layer": None,
}
self.logger.info(f"Added layer '{name}' with original CRS {crs}")
return
# Resolve nodata value: explicit arg > auto-detect from dtype
if nodata is None:
nodata_value = np.nan if np.issubdtype(data.dtype, np.floating) else 0
else:
nodata_value = nodata
# Create target array and reproject if needed
# Note: Affine.__ne__ returns array, so use tuple comparison instead
transforms_differ = (crs != target_crs) or (tuple(transform) != tuple(target_transform))
if transforms_differ:
self.logger.info(f"Reprojecting from {crs} to {target_crs}")
self.logger.info(f"Transforms: {transform} to {target_transform}")
# Initialise with nodata so uncovered destination pixels don't become 0
aligned_data = np.full(target_shape, nodata_value, dtype=data.dtype)
try:
reproject(
data,
aligned_data,
src_transform=transform,
src_crs=crs,
dst_transform=target_transform,
dst_crs=target_crs,
resampling=resampling,
src_nodata=nodata_value,
dst_nodata=nodata_value,
)
# Store reprojected data
self.data_layers[name] = {
"data": aligned_data,
"transform": target_transform,
"crs": target_crs,
"original_data": data,
"original_transform": transform,
"original_crs": crs,
"transformed": False,
"target_layer": target_layer_ref,
}
self.logger.info(f"Successfully added layer '{name}' (reprojected):")
self.logger.info(f" Shape: {aligned_data.shape}")
valid_pixels = aligned_data[~np.isnan(aligned_data)] if np.issubdtype(aligned_data.dtype, np.floating) else aligned_data.ravel()
if valid_pixels.size > 0:
self.logger.info(
f" Value range: {valid_pixels.min():.2f} to {valid_pixels.max():.2f}"
)
else:
self.logger.info(" Value range: all nodata (no valid pixels after reprojection)")
except Exception as e:
self.logger.error(f"Failed to reproject layer '{name}': {str(e)}")
raise
else:
# No reprojection needed
self.data_layers[name] = {
"data": data,
"transform": transform,
"crs": crs,
"transformed": False,
"target_layer": target_layer_ref,
}
self.logger.info(f"Added layer '{name}' (no reprojection needed)")
[docs]
def get_bbox_wgs84(self, layer: str = "dem") -> tuple[float, float, float, float]:
"""
Get bounding box in WGS84 coordinates (EPSG:4326).
Returns bbox in standard format used by OSM, web mapping APIs, etc.
Handles reprojection from any source CRS back to WGS84.
Args:
layer: Layer name to get bbox for (default: "dem")
Returns:
Tuple of (south, west, north, east) in WGS84 degrees
Raises:
KeyError: If specified layer doesn't exist
Example:
>>> terrain = Terrain(dem_data, transform, dem_crs="EPSG:32617") # UTM
>>> south, west, north, east = terrain.get_bbox_wgs84()
>>> print(f"Bounds: {south:.4f}°N to {north:.4f}°N, {west:.4f}°E to {east:.4f}°E")
"""
from rasterio.warp import transform_bounds
if layer not in self.data_layers:
raise KeyError(f"Layer '{layer}' not found")
layer_info = self.data_layers[layer]
# Use transformed data/transform if available, otherwise original
if layer_info.get("transformed") and "transformed_data" in layer_info:
data = layer_info["transformed_data"]
layer_transform = layer_info.get("transformed_transform", layer_info["transform"])
layer_crs = layer_info.get("transformed_crs", layer_info["crs"])
else:
data = layer_info["data"]
layer_transform = layer_info["transform"]
layer_crs = layer_info["crs"]
# Calculate bounds from transform and shape
height, width = data.shape
# Top-left corner
x_origin = layer_transform.c
y_origin = layer_transform.f
# Bottom-right corner
x_end = x_origin + layer_transform.a * width
y_end = y_origin + layer_transform.e * height
# Normalize bounds (west, south, east, north)
west = min(x_origin, x_end)
east = max(x_origin, x_end)
south = min(y_origin, y_end)
north = max(y_origin, y_end)
# If already WGS84, just return
if layer_crs in ("EPSG:4326", "epsg:4326", None):
return (south, west, north, east)
# Transform bounds to WGS84
transformed_bounds = transform_bounds(
layer_crs, "EPSG:4326", west, south, east, north
)
# transform_bounds returns (west, south, east, north)
t_west, t_south, t_east, t_north = transformed_bounds
return (t_south, t_west, t_north, t_east)
[docs]
def compute_data_layer(
self,
name: str,
source_layer: str,
compute_func: Callable[[np.ndarray], np.ndarray],
transformed: bool = False,
cache_key: Optional[str] = None,
) -> np.ndarray:
"""
Compute a new data layer from an existing one using a transformation function.
Allows creating derived layers (e.g., slope, aspect, hillshade) from existing data.
Results are stored as new layer and optionally cached.
Args:
name (str): Name for the computed layer.
source_layer (str): Name of existing source layer to compute from.
compute_func (Callable): Function that accepts source array (np.ndarray)
and returns computed array (np.ndarray). Can return same or different shape.
transformed (bool): If True, use already-transformed source data; if False,
use original source data (default: False).
cache_key (str, optional): Custom cache identifier. If None, auto-generated
from layer name and function name.
Returns:
np.ndarray: The computed layer data array.
Raises:
KeyError: If source_layer doesn't exist.
ValueError: If transformed=True but source hasn't been transformed.
Examples:
>>> # Compute slope from DEM using scipy
>>> from scipy.ndimage import sobel
>>> slope = terrain.compute_data_layer(
... 'slope', 'dem',
... lambda dem: np.sqrt(sobel(dem, axis=0)**2 + sobel(dem, axis=1)**2)
... )
>>> # Compute hill-shade visualization
>>> from scipy.ndimage import gaussian_filter
>>> hillshade = terrain.compute_data_layer(
... 'hillshade', 'dem',
... lambda dem: np.clip(gaussian_filter(dem, 2) * 0.5, 0, 1)
... )
>>> # Compute from transformed (downsampled) data
>>> downsampled_slope = terrain.compute_data_layer(
... 'slope_downsampled', 'dem',
... lambda dem: np.gradient(dem)[0],
... transformed=True
... )
"""
self.logger.info(f"Computing layer '{name}' from '{source_layer}'")
# Verify source layer exists
if source_layer not in self.data_layers:
raise KeyError(f"Source layer '{source_layer}' not found")
source_layer_info = self.data_layers[source_layer]
# Check if transformed data is requested but not available
if transformed and not source_layer_info.get("transformed", False):
raise ValueError(f"Source layer '{source_layer}' has not been transformed")
# Get appropriate source data and metadata
if transformed:
source_data = source_layer_info["transformed_data"]
source_transform = source_layer_info["transformed_transform"]
source_crs = source_layer_info["transformed_crs"]
else:
source_data = source_layer_info["data"]
source_transform = source_layer_info["transform"]
source_crs = source_layer_info["crs"]
# Generate cache key if not provided
if cache_key is None:
transform_suffix = "_transformed" if transformed else ""
cache_key = f"{name}_{source_layer}{transform_suffix}_{compute_func.__name__}"
# Try to load from cache
cached = self.cache.load(cache_key)
if cached is None:
self.logger.info(f"Computing {name} from {source_layer}")
try:
# Apply the computation function
computed_data = compute_func(source_data)
# Cache the result with source metadata
self.cache.save(
cache_key,
computed_data,
transform=source_transform,
crs=source_crs,
metadata={"source_layer": source_layer, "transformed": transformed},
)
except Exception as e:
self.logger.error(f"Failed to compute layer '{name}': {str(e)}")
raise
else:
self.logger.info(f"Loaded cached computation for '{name}'")
computed_data = cached["data"]
# Use cached metadata if available
source_transform = cached.get("transform", source_transform)
source_crs = cached.get("crs", source_crs)
# Add the computed layer with correct transform and CRS
self.add_data_layer(name, computed_data, source_transform, source_crs)
return computed_data
def _align_layers_to_targets(self):
"""
Post-processing step after apply_transforms() to align layers with their targets.
For layers added with target_layer parameter, this resamples them to match
the target layer's FINAL transformed shape. This ensures layers with different
source resolutions all end up at the same final resolution.
This is called automatically at the end of apply_transforms().
"""
from rasterio.warp import reproject, Resampling
layers_to_align = []
for name, layer in self.data_layers.items():
target_ref = layer.get("target_layer")
if target_ref and layer.get("transformed", False):
layers_to_align.append((name, target_ref))
if not layers_to_align:
return
self.logger.info(f"Aligning {len(layers_to_align)} layers to their targets...")
for name, target_ref in layers_to_align:
if target_ref not in self.data_layers:
self.logger.warning(
f"Target layer '{target_ref}' not found for layer '{name}', skipping alignment"
)
continue
layer = self.data_layers[name]
target = self.data_layers[target_ref]
if not target.get("transformed", False):
self.logger.warning(
f"Target layer '{target_ref}' not transformed yet, skipping alignment for '{name}'"
)
continue
# Get current and target shapes
current_shape = layer["transformed_data"].shape
target_shape = target["transformed_data"].shape
if current_shape == target_shape:
self.logger.debug(f"Layer '{name}' already matches target shape {target_shape}")
continue
self.logger.info(
f"Resampling '{name}' from {current_shape} to match '{target_ref}' {target_shape}"
)
# Resample to match target shape
aligned_data = np.zeros(target_shape, dtype=layer["transformed_data"].dtype)
try:
reproject(
layer["transformed_data"],
aligned_data,
src_transform=layer["transformed_transform"],
src_crs=layer["transformed_crs"],
dst_transform=target["transformed_transform"],
dst_crs=target["transformed_crs"],
resampling=Resampling.bilinear,
)
# Update layer with aligned data
layer["transformed_data"] = aligned_data
layer["transformed_transform"] = target["transformed_transform"]
layer["transformed_crs"] = target["transformed_crs"]
self.logger.info(f"✓ Aligned '{name}' to match '{target_ref}'")
except Exception as e:
self.logger.error(f"Failed to align '{name}' to '{target_ref}': {str(e)}")
# Don't raise - continue with other layers
[docs]
def detect_water_highres(
self,
slope_threshold: float = 0.01,
fill_holes: bool = True,
scale_factor: float = 0.0001,
) -> np.ndarray:
"""
Detect water bodies on high-resolution DEM before downsampling.
This method properly handles water detection by:
1. Applying all NON-downsampling transforms to DEM (reproject, flip, etc.)
2. Detecting water on the high-resolution transformed DEM
3. Downsampling the water mask to match the final terrain resolution
This prevents false water detection that occurs when detecting on downsampled DEMs.
Args:
slope_threshold: Maximum slope magnitude to classify as water (default: 0.01)
fill_holes: Whether to apply morphological hole filling (default: True)
scale_factor: Elevation scale factor to unscale before detection (default: 0.0001)
Returns:
np.ndarray: Boolean water mask matching transformed_data shape
Raises:
ValueError: If transforms haven't been applied yet
ImportError: If water detection module is not available
Example::
terrain = Terrain(dem, transform, dem_crs="EPSG:4326")
terrain.add_transform(reproject_raster(src_crs="EPSG:4326", dst_crs="EPSG:32617"))
terrain.add_transform(flip_raster(axis="horizontal"))
terrain.add_transform(scale_elevation(scale_factor=0.0001))
terrain.configure_for_target_vertices(target_vertices=1_000_000)
terrain.apply_transforms() # Includes downsampling
# Detect water on high-res BEFORE it was downsampled
water_mask = terrain.detect_water_highres(slope_threshold=0.01)
# water_mask shape matches downsampled terrain
Note:
This method requires that:
- Transforms have been applied (apply_transforms() called)
- The DEM layer exists in data_layers
- Water detection module (src.terrain.water) is available
"""
if "dem" not in self.data_layers:
raise ValueError("DEM layer not found. Cannot detect water without DEM.")
if not self.data_layers["dem"].get("transformed", False):
raise ValueError(
"Transforms have not been applied yet. Call apply_transforms() first."
)
from src.terrain.water import identify_water_by_slope
from scipy.ndimage import zoom
self.logger.info("Detecting water bodies on high-resolution DEM...")
# Get the original DEM
original_dem = self.data_layers["dem"]["data"]
original_transform = self.data_layers["dem"]["transform"]
original_crs = self.data_layers["dem"]["crs"]
# Apply all non-downsampling transforms
highres_dem = original_dem.copy()
current_transform = original_transform
current_crs = original_crs
for transform_func in self.transforms:
# Skip downsampling transforms - we want high-res
if "downsample" in transform_func.__name__.lower():
self.logger.debug(f"Skipping {transform_func.__name__} for high-res water detection")
continue
try:
highres_dem, current_transform, new_crs = transform_func(
highres_dem, current_transform
)
if new_crs is not None:
current_crs = new_crs
except Exception as e:
self.logger.error(
f"Failed applying transform {transform_func.__name__} for water detection: {e}"
)
raise
self.logger.info(f" High-res DEM shape: {highres_dem.shape}")
# Unscale elevation if scale_factor was applied
if scale_factor is not None and scale_factor != 1.0:
unscaled_dem = highres_dem / scale_factor
else:
unscaled_dem = highres_dem
# Detect water on high-res DEM
water_mask_highres = identify_water_by_slope(
unscaled_dem,
slope_threshold=slope_threshold,
fill_holes=fill_holes,
)
water_pixels_highres = np.sum(water_mask_highres)
water_percent_highres = 100 * water_pixels_highres / water_mask_highres.size
self.logger.info(
f" High-res water detection: {water_pixels_highres:,} pixels "
f"({water_percent_highres:.1f}%)"
)
# Now downsample the water mask to match the FINAL transformed DEM
# (after ALL transforms including adaptive downsampling)
# We need to apply ALL transforms to get the actual final shape
final_dem = original_dem.copy()
final_transform = original_transform
final_crs = original_crs
for transform_func in self.transforms:
try:
final_dem, final_transform, new_crs = transform_func(
final_dem, final_transform
)
if new_crs is not None:
final_crs = new_crs
except Exception as e:
self.logger.error(
f"Failed applying transform {transform_func.__name__} for final shape: {e}"
)
raise
target_shape = final_dem.shape
self.logger.debug(
f" Final DEM shape after all transforms: {target_shape}"
)
if water_mask_highres.shape == target_shape:
self.logger.info(f" Water mask already matches target shape {target_shape}")
return water_mask_highres
# Calculate zoom factor for downsampling the mask
zoom_y = target_shape[0] / water_mask_highres.shape[0]
zoom_x = target_shape[1] / water_mask_highres.shape[1]
self.logger.info(
f" Downsampling water mask: {water_mask_highres.shape} → {target_shape}"
)
# Downsample water mask using nearest-neighbor (order=0) to preserve boolean nature
water_mask_downsampled = zoom(
water_mask_highres.astype(np.float32),
zoom=(zoom_y, zoom_x),
order=0, # Nearest neighbor for boolean mask
prefilter=False,
).astype(np.bool_)
water_pixels_final = np.sum(water_mask_downsampled)
water_percent_final = 100 * water_pixels_final / water_mask_downsampled.size
self.logger.info(
f" Final water mask: {water_pixels_final:,} pixels "
f"({water_percent_final:.1f}%)"
)
return water_mask_downsampled
[docs]
def detect_water(
self,
slope_threshold: float = 0.01,
fill_holes: bool = True,
) -> np.ndarray:
"""
Detect water bodies on the transformed (downsampled) DEM.
This is a fast, simple water detection method that works on the already-
transformed DEM. It automatically handles elevation scale factor unscaling.
For higher accuracy at the cost of speed, use detect_water_highres() instead,
which detects water on the full-resolution DEM before downsampling.
Args:
slope_threshold: Maximum slope magnitude to classify as water (default: 0.01).
Water bodies have nearly zero slope.
fill_holes: Whether to apply morphological hole filling (default: True)
Returns:
np.ndarray: Boolean water mask matching transformed_data shape
Raises:
ValueError: If transforms haven't been applied yet
Example:
```python
terrain = Terrain(dem, transform)
terrain.apply_transforms()
water_mask = terrain.detect_water()
terrain.compute_colors(water_mask=water_mask)
```
"""
if "dem" not in self.data_layers:
raise ValueError("DEM layer not found. Cannot detect water without DEM.")
if not self.data_layers["dem"].get("transformed", False):
raise ValueError(
"Transforms have not been applied yet. Call apply_transforms() first."
)
from src.terrain.water import identify_water_by_slope
self.logger.info("Detecting water bodies on transformed DEM...")
# Get the transformed DEM
dem_data = self.data_layers["dem"]["transformed_data"]
# Auto-detect elevation scale factor from model_params or estimate from data range
scale_factor = self.model_params.get("elevation_scale", None)
if scale_factor is None:
# Estimate: if max elevation is << 1, it was probably scaled
max_elev = np.nanmax(dem_data)
if max_elev < 1.0:
# Likely scaled by 0.0001 (max ~0.03 for 300m terrain)
scale_factor = 0.0001
self.logger.debug(f"Auto-detected scale factor: {scale_factor}")
else:
scale_factor = 1.0 # Already in meters
# Unscale to meters for proper slope calculation
if scale_factor != 1.0:
unscaled_dem = dem_data / scale_factor
self.logger.debug(f"Unscaled DEM from {np.nanmax(dem_data):.4f} to {np.nanmax(unscaled_dem):.1f}m")
else:
unscaled_dem = dem_data
# Detect water
water_mask = identify_water_by_slope(
unscaled_dem,
slope_threshold=slope_threshold,
fill_holes=fill_holes,
)
water_pixels = np.sum(water_mask)
water_percent = 100 * water_pixels / water_mask.size
self.logger.info(f" Water detected: {water_pixels:,} pixels ({water_percent:.1f}%)")
return water_mask
[docs]
def set_color_mapping(
self,
color_func: Callable[[np.ndarray, ...], np.ndarray],
source_layers: list[str],
*,
color_kwargs: Optional[Dict[str, Any]] = None,
mask_func: Optional[Callable[[np.ndarray, ...], np.ndarray]] = None,
mask_layers: Optional[list[str] | str] = None,
mask_kwargs: Optional[Dict[str, Any]] = None,
mask_threshold: Optional[float] = None,
) -> None:
"""
Set up how to map data layers to colors (RGB) and optionally a mask/alpha channel.
Allows flexible color mapping by applying a function to one or more data layers.
Optionally applies a separate mask function for transparency/alpha channel control.
Color mapping is applied during mesh creation with `compute_colors()`.
Args:
color_func (Callable): Function that accepts N data arrays (one per source_layers)
and returns colored array of shape (H, W, 3) for RGB or (H, W, 4) for RGBA.
Values should be in range [0, 1] for 8-bit output.
source_layers (list[str]): Names of data layers to pass to color_func, in order.
E.g., ['dem'] for single layer or ['red', 'green', 'blue'] for composite.
color_kwargs (dict, optional): Additional keyword arguments passed to color_func.
mask_func (Callable, optional): Function producing alpha/mask values (0-1) for
transparency. Takes layer arrays as input. If omitted, fully opaque.
mask_layers (list[str] | str, optional): Layer names for mask_func. If None,
uses source_layers. Single string converted to list.
mask_kwargs (dict, optional): Additional keyword arguments for mask_func.
mask_threshold (float, optional): If mask_func is threshold-based, convenience
parameter for threshold value (implementation-dependent).
Returns:
None: Modifies internal color mapping configuration.
Raises:
ValueError: If source_layers or mask_layers refer to non-existent layers.
Examples:
>>> # Single-layer elevation with viridis colormap
>>> from matplotlib.cm import viridis
>>> terrain.set_color_mapping(
... lambda dem: viridis(dem / dem.max()),
... ['dem']
... )
>>> # RGB composite from three layers
>>> terrain.set_color_mapping(
... lambda r, g, b: np.stack([r, g, b], axis=-1),
... ['red_band', 'green_band', 'blue_band']
... )
>>> # Elevation with water transparency mask
>>> terrain.set_color_mapping(
... lambda dem: elevation_colormap(dem),
... ['dem'],
... mask_func=lambda dem: (dem > 0).astype(float),
... mask_layers=['dem']
... )
>>> # Hillshade with elevation colors and slope transparency
>>> terrain.set_color_mapping(
... lambda dem: dem_colors,
... ['dem'],
... mask_func=lambda dem: 1 - np.clip(np.gradient(dem)[0], 0, 1),
... mask_layers=['dem']
... )
"""
# Validate source_layers exist
missing_layers = [name for name in source_layers if name not in self.data_layers]
if missing_layers:
raise ValueError(f"Source layers not found: {missing_layers}")
# Default kwargs dicts
if color_kwargs is None:
color_kwargs = {}
if mask_kwargs is None:
mask_kwargs = {}
# Handle mask layer defaults
if mask_func:
if mask_layers is None:
mask_layers = source_layers
elif isinstance(mask_layers, str):
mask_layers = [mask_layers]
# Validate mask_layers exist
missing_mask_layers = [name for name in mask_layers if name not in self.data_layers]
if missing_mask_layers:
raise ValueError(f"Mask layers not found: {missing_mask_layers}")
else:
mask_layers = []
# Store mapping setup
self.color_mapping = color_func
self.color_sources = list(source_layers)
self.color_kwargs = color_kwargs
self.mask_func = mask_func
self.mask_sources = mask_layers
self.mask_kwargs = mask_kwargs
self.mask_threshold = mask_threshold
# Logging
self.logger.info(f"Color function: {color_func.__name__}")
self.logger.info(f"Color source layers: {source_layers}")
if color_kwargs:
self.logger.info(f"Color kwargs: {color_kwargs}")
if mask_func:
self.logger.info(f"Mask function: {mask_func.__name__}")
self.logger.info(f"Mask source layers: {mask_layers}")
if mask_kwargs:
self.logger.info(f"Mask kwargs: {mask_kwargs}")
if mask_threshold is not None:
self.logger.info(f"Mask threshold: {mask_threshold}")
[docs]
def set_blended_color_mapping(
self,
base_colormap: Callable[[np.ndarray], np.ndarray],
base_source_layers: list[str],
overlay_colormap: Callable[[np.ndarray], np.ndarray],
overlay_source_layers: list[str],
overlay_mask: np.ndarray,
*,
base_color_kwargs: Optional[Dict[str, Any]] = None,
overlay_color_kwargs: Optional[Dict[str, Any]] = None,
) -> None:
"""
Apply two different colormaps based on a spatial mask (hard transition).
Uses base colormap for most of terrain and overlay colormap for masked zones.
This is useful for showing different data types in different regions, such as
elevation colors for general terrain but suitability scores near parks.
Args:
base_colormap: Function mapping base layer(s) to RGB/RGBA. Takes N arrays
(one per base_source_layers) and returns (H, W, 3) or (H, W, 4).
base_source_layers: Layer names to pass to base_colormap, e.g., ['dem'].
overlay_colormap: Function mapping overlay layer(s) to RGB/RGBA. Takes N
arrays (one per overlay_source_layers) and returns (H, W, 3) or (H, W, 4).
overlay_source_layers: Layer names to pass to overlay_colormap, e.g., ['score'].
overlay_mask: Boolean array of shape (num_vertices,) indicating where to use
overlay colormap. True = overlay, False = base. Use compute_proximity_mask()
to create this mask.
base_color_kwargs: Optional kwargs passed to base_colormap.
overlay_color_kwargs: Optional kwargs passed to overlay_colormap.
Returns:
None: Modifies internal color mapping to use blended approach.
Raises:
ValueError: If source layers don't exist or overlay_mask has wrong shape.
RuntimeError: If create_mesh() hasn't been called yet (needed for mask validation).
Example:
>>> from src.terrain.color_mapping import elevation_colormap
>>> # Compute proximity mask for park zones
>>> park_mask = terrain.compute_proximity_mask(
... park_lons, park_lats, radius_meters=1000
... )
>>> # Set dual colormaps
>>> terrain.set_blended_color_mapping(
... base_colormap=lambda elev: elevation_colormap(
... elev, cmap_name="gist_earth"
... ),
... base_source_layers=["dem"],
... overlay_colormap=lambda score: elevation_colormap(
... score, cmap_name="cool", min_elev=0, max_elev=1
... ),
... overlay_source_layers=["score"],
... overlay_mask=park_mask
... )
>>> terrain.compute_colors() # Apply the blended mapping
"""
# Validate source_layers exist
all_layers = set(base_source_layers) | set(overlay_source_layers)
missing_layers = [name for name in all_layers if name not in self.data_layers]
if missing_layers:
raise ValueError(f"Source layers not found: {missing_layers}")
# Validate overlay_mask (can be grid-space or vertex-space)
# Actual shape validation happens in _compute_blended_colors()
overlay_mask = np.asarray(overlay_mask)
# Check if it's a valid 1D or 2D array
if overlay_mask.ndim not in (1, 2):
raise ValueError(
f"overlay_mask must be 1D (vertex-space) or 2D (grid-space). "
f"Got shape {overlay_mask.shape}."
)
# Default kwargs
if base_color_kwargs is None:
base_color_kwargs = {}
if overlay_color_kwargs is None:
overlay_color_kwargs = {}
# Store blended color mapping configuration
self.color_mapping_mode = "blended"
self.base_colormap = base_colormap
self.base_color_sources = list(base_source_layers)
self.base_color_kwargs = base_color_kwargs
self.overlay_colormap = overlay_colormap
self.overlay_color_sources = list(overlay_source_layers)
self.overlay_color_kwargs = overlay_color_kwargs
self.overlay_mask = overlay_mask
self.logger.info("Blended color mapping configured:")
self.logger.info(f" Base colormap: {base_colormap.__name__} on {base_source_layers}")
self.logger.info(f" Overlay colormap: {overlay_colormap.__name__} on {overlay_source_layers}")
# Log mask info (grid-space or vertex-space)
mask_sum = np.sum(overlay_mask)
mask_size = overlay_mask.size
mask_type = "grid-space" if overlay_mask.ndim == 2 else "vertex-space"
self.logger.info(
f" Overlay mask ({mask_type}): {mask_sum}/{mask_size} elements "
f"({100.0 * mask_sum / mask_size:.1f}%)"
)
[docs]
def set_multi_color_mapping(
self,
base_colormap: Callable[[np.ndarray, ...], np.ndarray],
base_source_layers: list[str],
overlays: list[dict],
*,
base_color_kwargs: Optional[Dict[str, Any]] = None,
) -> None:
"""
Apply multiple data layers as overlays with different colormaps and priority.
This enables flexible data visualization where multiple geographic features
(roads, trails, land use, power lines, etc.) can each be colored independently
based on their own colormaps and source layers.
Args:
base_colormap: Function mapping base layer(s) to RGB/RGBA. Takes N arrays
(one per base_source_layers) and returns (H, W, 3) or (H, W, 4).
base_source_layers: Layer names for base_colormap, e.g., ['dem'].
overlays: List of overlay specifications. Each overlay dict contains
``colormap`` (function returning H,W,3/4), ``source_layers`` (list of
layer names), ``colormap_kwargs`` (optional dict), ``threshold`` (value
above which overlay applies, default 0.5), and ``priority`` (lower =
higher priority).
base_color_kwargs: Optional kwargs passed to base_colormap.
Returns:
None: Modifies internal color mapping to use multi-overlay approach.
Raises:
ValueError: If source layers don't exist or overlay specs are invalid.
Example:
>>> # Base elevation colors with roads and trails overlays
>>> terrain.set_multi_color_mapping(
... base_colormap=lambda elev: elevation_colormap(elev, "michigan"),
... base_source_layers=["dem"],
... overlays=[
... {
... "colormap": lambda roads: colormap_roads(roads),
... "source_layers": ["roads"],
... "priority": 10, # High priority roads show on top
... },
... {
... "colormap": lambda trails: colormap_trails(trails),
... "source_layers": ["trails"],
... "priority": 20, # Lower priority
... },
... ]
... )
>>> terrain.compute_colors()
"""
# Validate all source_layers exist
all_layers = set(base_source_layers)
for overlay in overlays:
all_layers.update(overlay.get("source_layers", []))
missing_layers = [name for name in all_layers if name not in self.data_layers]
if missing_layers:
raise ValueError(f"Source layers not found: {missing_layers}")
# Validate overlay specs
for i, overlay in enumerate(overlays):
if "colormap" not in overlay:
raise ValueError(f"Overlay {i} missing required 'colormap' key")
if "source_layers" not in overlay:
raise ValueError(f"Overlay {i} missing required 'source_layers' key")
if "priority" not in overlay:
raise ValueError(f"Overlay {i} missing required 'priority' key")
# Sort overlays by priority (lower number = higher priority = applied first)
sorted_overlays = sorted(overlays, key=lambda x: x["priority"])
# Default kwargs
if base_color_kwargs is None:
base_color_kwargs = {}
# Store multi-overlay configuration
self.color_mapping_mode = "multi_overlay"
self.base_colormap = base_colormap
self.base_color_sources = list(base_source_layers)
self.base_color_kwargs = base_color_kwargs
self.overlays = sorted_overlays
self.logger.info("Multi-overlay color mapping configured:")
self.logger.info(f" Base colormap: {base_colormap.__name__} on {base_source_layers}")
self.logger.info(f" Number of overlays: {len(overlays)}")
for i, overlay in enumerate(sorted_overlays):
has_mask = "mask" in overlay
mask_info = f", has_mask={has_mask}" if has_mask else ""
threshold = overlay.get("threshold", "default")
self.logger.info(
f" Overlay {i} (priority {overlay['priority']}): "
f"{overlay['colormap'].__name__} on {overlay['source_layers']}"
f" [threshold={threshold}{mask_info}]"
)
[docs]
def compute_colors(self, water_mask=None):
"""
Compute colors using color_func and optionally mask_func.
Supports three modes:
- Standard: Single colormap applied to all vertices
- Blended: Two colormaps blended based on proximity mask
- Multi-overlay: Multiple overlays with different colormaps and priority
Args:
water_mask (np.ndarray, optional): Boolean water mask in grid space (height × width).
For blended mode, water pixels will be colored blue in the final vertex colors.
For standard and multi-overlay modes, water detection is handled in create_mesh().
Returns:
np.ndarray: RGBA color array.
"""
# Check if multi-overlay mode
if hasattr(self, "color_mapping_mode") and self.color_mapping_mode == "multi_overlay":
return self._compute_multi_overlay_colors(water_mask=water_mask)
# Check if blended mode
if hasattr(self, "color_mapping_mode") and self.color_mapping_mode == "blended":
return self._compute_blended_colors(water_mask=water_mask)
# Standard single colormap mode
if not hasattr(self, "color_mapping") or not hasattr(self, "color_sources"):
raise ValueError("Color mapping not set. Call set_color_mapping() first.")
self.logger.info("Computing colors...")
# Prepare color data arrays
color_arrays = [
(
self.data_layers[layer]["transformed_data"]
if self.data_layers[layer].get("transformed")
else self.data_layers[layer]["data"]
)
for layer in self.color_sources
]
# Compute base colors
try:
colors = self.color_mapping(*color_arrays, **self.color_kwargs)
except Exception as e:
self.logger.error(f"Error computing colors: {str(e)}")
raise
# Ensure RGBA
if colors.shape[-1] == 3:
# Create alpha channel with appropriate max value for the data type
if colors.dtype == np.uint8:
alpha_channel = np.full(colors.shape[:2] + (1,), 255, dtype=colors.dtype)
else:
alpha_channel = np.ones(colors.shape[:2] + (1,), dtype=colors.dtype)
colors = np.concatenate([colors, alpha_channel], axis=-1)
# Apply mask if provided
if self.mask_func:
mask_arrays = [
(
self.data_layers[layer]["transformed_data"]
if self.data_layers[layer].get("transformed")
else self.data_layers[layer]["data"]
)
for layer in self.mask_sources
]
try:
mask = self.mask_func(*mask_arrays, **self.mask_kwargs)
except Exception as e:
self.logger.error(f"Error computing mask: {str(e)}")
raise
# Apply threshold if provided
if self.mask_threshold is not None:
mask = mask >= self.mask_threshold
# Update alpha channel based on mask
colors[..., 3] = np.where(mask, 0.0, 1.0)
self.colors = colors
self.logger.info(f"Colors computed successfully with shape {colors.shape}")
return colors
def _compute_blended_colors(self, water_mask=None):
"""
Compute colors using blended colormap mode (internal method).
Computes base colors for entire DEM, overlay colors for overlay zones,
and blends them according to overlay_mask at the vertex level.
Args:
water_mask (np.ndarray, optional): Boolean water mask in grid space (height × width).
If provided, water pixels will be colored blue in the final vertex colors.
Returns:
np.ndarray: RGBA color array with blended colors.
"""
self.logger.info("Computing blended colors...")
# Get transformed DEM data to determine grid shape
dem_data = self.data_layers["dem"]["transformed_data"]
height, width = dem_data.shape
# Compute base colors for all pixels
base_arrays = [
(
self.data_layers[layer]["transformed_data"]
if self.data_layers[layer].get("transformed")
else self.data_layers[layer]["data"]
)
for layer in self.base_color_sources
]
try:
base_colors_grid = self.base_colormap(*base_arrays, **self.base_color_kwargs)
except Exception as e:
self.logger.error(f"Error computing base colors: {str(e)}")
raise
# Compute overlay colors for all pixels
overlay_arrays = [
(
self.data_layers[layer]["transformed_data"]
if self.data_layers[layer].get("transformed")
else self.data_layers[layer]["data"]
)
for layer in self.overlay_color_sources
]
try:
overlay_colors_grid = self.overlay_colormap(*overlay_arrays, **self.overlay_color_kwargs)
except Exception as e:
self.logger.error(f"Error computing overlay colors: {str(e)}")
raise
# Ensure both are RGBA
for colors_grid, name in [(base_colors_grid, "base"), (overlay_colors_grid, "overlay")]:
if colors_grid.shape[-1] == 3:
if colors_grid.dtype == np.uint8:
alpha = np.full(colors_grid.shape[:2] + (1,), 255, dtype=colors_grid.dtype)
else:
alpha = np.ones(colors_grid.shape[:2] + (1,), dtype=colors_grid.dtype)
if name == "base":
base_colors_grid = np.concatenate([colors_grid, alpha], axis=-1)
else:
overlay_colors_grid = np.concatenate([colors_grid, alpha], axis=-1)
# Map grid colors to vertex colors using y_valid, x_valid
# These map vertex index to (row, col) in the grid
base_vertex_colors = base_colors_grid[self.y_valid, self.x_valid]
overlay_vertex_colors = overlay_colors_grid[self.y_valid, self.x_valid]
# Handle boundary vertices if they exist
# Boundary vertices (from boundary_extension) don't map to grid pixels
# Use nearest valid pixel color (or default color)
num_surface_vertices = len(self.y_valid)
# vertices may not be set yet if compute_colors() is called early
num_total_vertices = len(self.vertices) if self.vertices is not None else num_surface_vertices
if num_total_vertices > num_surface_vertices:
# Has boundary vertices - pad with default color
self.logger.info(
f" Padding colors for {num_total_vertices - num_surface_vertices} boundary vertices"
)
# Use mean color for boundary (or could use edge colors)
default_color = np.mean(base_vertex_colors, axis=0).astype(base_vertex_colors.dtype)
# Extend vertex colors arrays
base_vertex_colors = np.vstack(
[base_vertex_colors, np.tile(default_color, (num_total_vertices - num_surface_vertices, 1))]
)
overlay_vertex_colors = np.vstack(
[
overlay_vertex_colors,
np.tile(default_color, (num_total_vertices - num_surface_vertices, 1)),
]
)
# Convert overlay_mask to vertex-space if it's grid-space
if self.overlay_mask.ndim == 2:
# Grid-space mask: convert to vertex-space using y_valid, x_valid
self.logger.info(f" Converting grid-space mask to vertex-space...")
overlay_mask_vertex = self.overlay_mask[self.y_valid, self.x_valid]
# Pad with False for boundary vertices if they exist
if num_total_vertices > num_surface_vertices:
padding = np.zeros(num_total_vertices - num_surface_vertices, dtype=bool)
overlay_mask_vertex = np.concatenate([overlay_mask_vertex, padding])
else:
# Already vertex-space
overlay_mask_vertex = self.overlay_mask
# Blend colors using overlay_mask: True = overlay, False = base
colors = np.where(
overlay_mask_vertex[:, None], # Broadcast to (N, 1) for RGBA channels
overlay_vertex_colors,
base_vertex_colors,
)
num_overlay = np.sum(overlay_mask_vertex)
num_base = len(overlay_mask_vertex) - num_overlay
self.logger.info(
f"Blended colors computed: {num_overlay} overlay vertices, {num_base} base vertices"
)
# Apply water coloring if water mask provided
if water_mask is not None:
self.logger.info(f"Applying water coloring to blended vertex colors...")
self.logger.debug(f" Water mask shape: {water_mask.shape}")
self.logger.debug(f" DEM shape: {dem_data.shape}")
self.logger.debug(f" Num surface vertices: {num_surface_vertices}")
self.logger.debug(f" y_valid range: {self.y_valid.min()}-{self.y_valid.max()}")
self.logger.debug(f" x_valid range: {self.x_valid.min()}-{self.x_valid.max()}")
# Create shoreline vignette for water bodies (cartographic style)
# Compute distance transform to measure distance from water edges (shores)
from scipy.ndimage import distance_transform_edt
water_distances = distance_transform_edt(water_mask)
# Define gradient colors for shoreline vignette
edge_color = np.array([25, 85, 125], dtype=np.float32) # Light blue (shore)
center_color = np.array([15, 50, 85], dtype=np.float32) # Dark blue (interior)
# Map water mask from grid space to vertex space (vectorized)
# Only for surface vertices (not boundary vertices)
water_at_vertices = water_mask[self.y_valid, self.x_valid]
water_vertex_indices = np.where(water_at_vertices)[0]
# Get raw pixel distances for all water vertices
water_y = self.y_valid[water_vertex_indices]
water_x = self.x_valid[water_vertex_indices]
water_pixel_distances = water_distances[water_y, water_x]
# Cartographic shoreline vignette style (vintage map aesthetic)
# Gradient only in shoreline band; interior water is uniform dark
shoreline_width_pixels = 12
# t=1 means dark (interior), t=0 means light (at shore edge)
# Start with all water as interior (dark)
t = np.ones_like(water_pixel_distances)
# Apply gradient only within shoreline band
in_shoreline_band = water_pixel_distances < shoreline_width_pixels
t[in_shoreline_band] = water_pixel_distances[in_shoreline_band] / shoreline_width_pixels
# Power curve for smoother transition
t = np.power(t, 0.5)[:, np.newaxis]
water_colors = edge_color * (1 - t) + center_color * t
# Apply gradient colors to water vertices
surface_colors = colors[:num_surface_vertices]
surface_colors[water_vertex_indices, :3] = water_colors.astype(np.uint8)
water_vertex_count = len(water_vertex_indices)
self.logger.info(f"Water colored blue ({water_vertex_count} vertices)")
self.colors = colors
return colors
def _compute_multi_overlay_colors(self, water_mask=None):
"""
Compute colors using multi-overlay mode (internal method).
Combines base colormap with multiple overlays. For each grid pixel, applies the
first overlay (by priority) whose source data is non-zero and non-NaN. Falls back
to base colormap if no overlays match.
Args:
water_mask (np.ndarray, optional): Boolean water mask in grid space (height × width).
If provided, water pixels will be colored blue in the final vertex colors.
Returns:
np.ndarray: RGBA color array with multi-overlay colors.
"""
self.logger.info("Computing multi-overlay colors...")
# Get transformed DEM data to determine grid shape (fall back to original if not transformed)
dem_layer = self.data_layers["dem"]
if "transformed_data" in dem_layer:
dem_data = dem_layer["transformed_data"]
else:
dem_data = dem_layer["data"]
height, width = dem_data.shape
# Compute base colors for all pixels
base_arrays = [
(
self.data_layers[layer]["transformed_data"]
if self.data_layers[layer].get("transformed")
else self.data_layers[layer]["data"]
)
for layer in self.base_color_sources
]
try:
base_colors_grid = self.base_colormap(*base_arrays, **self.base_color_kwargs)
except Exception as e:
self.logger.error(f"Error computing base colors: {str(e)}")
raise
# Ensure base colors are RGBA
if base_colors_grid.shape[-1] == 3:
if base_colors_grid.dtype == np.uint8:
alpha = np.full(base_colors_grid.shape[:2] + (1,), 255, dtype=base_colors_grid.dtype)
else:
alpha = np.ones(base_colors_grid.shape[:2] + (1,), dtype=base_colors_grid.dtype)
base_colors_grid = np.concatenate([base_colors_grid, alpha], axis=-1)
# Initialize result grid with base colors
result_colors_grid = np.copy(base_colors_grid)
# Apply overlays in priority order
for overlay_idx, overlay in enumerate(self.overlays):
overlay_colormap = overlay["colormap"]
overlay_sources = overlay["source_layers"]
overlay_kwargs = overlay.get("colormap_kwargs", {})
priority = overlay["priority"]
# Get overlay source data
overlay_arrays = [
(
self.data_layers[layer]["transformed_data"]
if self.data_layers[layer].get("transformed")
else self.data_layers[layer]["data"]
)
for layer in overlay_sources
]
try:
overlay_colors_grid = overlay_colormap(*overlay_arrays, **overlay_kwargs)
except Exception as e:
self.logger.error(f"Error computing overlay {overlay_idx} colors: {str(e)}")
raise
# Ensure overlay colors are RGBA
if overlay_colors_grid.shape[-1] == 3:
if overlay_colors_grid.dtype == np.uint8:
alpha = np.full(overlay_colors_grid.shape[:2] + (1,), 255, dtype=overlay_colors_grid.dtype)
else:
alpha = np.ones(overlay_colors_grid.shape[:2] + (1,), dtype=overlay_colors_grid.dtype)
overlay_colors_grid = np.concatenate([overlay_colors_grid, alpha], axis=-1)
# Create mask for where this overlay applies
# Option 1: Explicit mask provided (e.g., park_mask for proximity-based overlays)
# Option 2: Threshold-based mask from first source layer value
explicit_mask = overlay.get("mask", None)
use_explicit_mask = False
if explicit_mask is not None:
# Explicit mask provided - can be grid-space or vertex-space
explicit_mask = np.asarray(explicit_mask)
grid_shape = overlay_arrays[0].shape
has_mesh = hasattr(self, "y_valid") and self.y_valid is not None
if explicit_mask.shape == grid_shape:
# Already grid-space - preferred form
overlay_mask = explicit_mask.astype(bool)
use_explicit_mask = True
self.logger.info(
f"Overlay {overlay_idx}: using grid-space mask directly, "
f"{np.sum(overlay_mask)} grid pixels"
)
elif has_mesh and explicit_mask.shape == (len(self.y_valid),):
# Convert vertex mask to grid mask (exact match)
overlay_mask = np.zeros(grid_shape, dtype=bool)
overlay_mask[self.y_valid, self.x_valid] = explicit_mask
use_explicit_mask = True
self.logger.info(
f"Overlay {overlay_idx}: converted vertex mask to grid mask, "
f"{np.sum(overlay_mask)} grid pixels"
)
elif has_mesh and len(explicit_mask.shape) == 1 and len(explicit_mask) >= len(self.y_valid):
# Vertex-space mask but might include boundary vertices
self.logger.info(
f"Overlay {overlay_idx}: mask has {len(explicit_mask)} entries, "
f"using first {len(self.y_valid)} for surface vertices"
)
overlay_mask = np.zeros(grid_shape, dtype=bool)
overlay_mask[self.y_valid, self.x_valid] = explicit_mask[:len(self.y_valid)]
use_explicit_mask = True
self.logger.info(
f"Overlay {overlay_idx}: converted vertex mask to grid mask, "
f"{np.sum(overlay_mask)} grid pixels"
)
else:
self.logger.warning(
f"Overlay {overlay_idx}: mask shape {explicit_mask.shape} doesn't match "
f"grid shape {grid_shape}. Falling back to threshold-based mask."
)
if not use_explicit_mask:
# Use threshold-based mask from source layer values
overlay_mask_data = overlay_arrays[0]
threshold = overlay.get("threshold", 0.5)
# Special case: threshold=0.0 should exclude zeros (use > not >=)
# This handles sparse layers like stream networks where non-feature pixels are 0
if threshold == 0.0:
overlay_mask = (overlay_mask_data > 0.0) & ~np.isnan(overlay_mask_data)
else:
overlay_mask = (overlay_mask_data >= threshold) & ~np.isnan(overlay_mask_data)
self.logger.info(
f"Overlay {overlay_idx}: using threshold={threshold}, "
f"{np.sum(overlay_mask)} grid pixels"
)
# Apply overlay colors where mask is True
result_colors_grid[overlay_mask] = overlay_colors_grid[overlay_mask]
self.logger.debug(
f"Overlay {overlay_idx} (priority {priority}): "
f"applied to {np.sum(overlay_mask)} grid pixels"
)
# Store grid-level colors (create_mesh will handle vertex mapping and water coloring)
# This matches the pattern used by standard mode compute_colors()
self.colors = result_colors_grid
self.logger.info(f"Multi-overlay colors computed: {len(self.overlays)} overlays applied")
self.logger.info(f"Colors stored as grid-space array: {result_colors_grid.shape}")
return result_colors_grid
[docs]
def create_mesh(
self,
base_depth=0.2,
boundary_extension=True,
scale_factor=100.0,
height_scale=1.0,
center_model=True,
verbose=True,
detect_water=False,
water_slope_threshold=0.5,
water_mask=None,
two_tier_edge=True,
edge_mid_depth=None,
edge_base_material="clay",
edge_blend_colors=False,
smooth_boundary=False,
smooth_boundary_window=5,
use_catmull_rom=False,
catmull_rom_subdivisions=10,
use_rectangle_edges=True,
use_fractional_edges=True,
edge_sample_spacing=0.33,
):
"""
Create a Blender mesh from transformed DEM data with both performance and control.
Generates vertices from DEM elevation values and faces for connectivity. Optionally
creates boundary faces to close the mesh into a solid. Supports coordinate scaling
and elevation scaling for visualization. Can optionally detect and apply water bodies
to vertex alpha channel for water rendering.
Args:
base_depth (float): Positive depth offset below minimum surface elevation (default: 0.2).
Creates a flat base plane at: min_surface_z - base_depth.
Used when boundary_extension=True to create side faces.
Positive values extend below surface, negative extend above.
boundary_extension (bool): Whether to create side faces around the terrain boundary
to close the mesh (default: True). If False, creates open terrain surface.
scale_factor (float): Horizontal scale divisor for x/y coordinates (default: 100.0).
Higher values produce smaller meshes. E.g., 100 means 100 DEM units = 1 Blender unit.
height_scale (float): Multiplier for elevation values (default: 1.0). Vertically
exaggerates or reduces terrain features. Values > 1 exaggerate, < 1 flatten.
center_model (bool): Whether to center the model at origin (default: True).
Centers XY coordinates but preserves absolute Z elevation values.
verbose (bool): Whether to log detailed progress information (default: True).
detect_water (bool): Whether to detect water bodies and apply to alpha channel
(default: False). Uses slope-based detection on transformed DEM.
water_slope_threshold (float): Maximum slope magnitude to classify as water
(default: 0.5). Only used if detect_water=True and water_mask is None.
water_mask (np.ndarray): Pre-computed boolean water mask (True=water, False=land).
If provided, this mask is used instead of computing water detection.
Allows water detection on unscaled DEM before elevation scaling transforms.
two_tier_edge (bool): Enable two-tier edge extrusion (default: True).
Creates a small colored edge near the surface with a larger uniform base below.
edge_mid_depth (float): Positive depth offset below surface for middle tier (default: auto-calculated).
If None, automatically set to base_depth * 0.25 (typically 0.05).
Positive values extend below surface, negative extend above surface.
edge_base_material (str | tuple): Material for base layer (default: "clay").
Either a preset name ("clay", "obsidian", "chrome", "plastic", "gold", "ivory")
or an RGB tuple (0-1 range).
edge_blend_colors (bool): Blend surface colors to mid tier in two-tier mode (default: False).
If False, mid tier uses base_material color for sharp transition between mesh and edge.
smooth_boundary (bool): Apply smoothing to boundary points to eliminate stair-step edges
(default: False). Useful for smoother mesh transitions when using two-tier edge.
smooth_boundary_window (int): Window size for boundary smoothing (default: 5).
Larger values produce more smoothing. Only used when smooth_boundary=True.
use_catmull_rom (bool): Use Catmull-Rom curve fitting for smooth boundary geometry
(default: False). When enabled, eliminates pixel-grid staircase pattern entirely
by fitting smooth parametric curve through boundary points.
catmull_rom_subdivisions (int): Number of interpolated points per boundary segment
when using Catmull-Rom curves (default: 10). Higher values = smoother curve
but more vertices. Only used when use_catmull_rom=True.
use_rectangle_edges (bool): Use rectangle-edge sampling instead of morphological
boundary detection (default: True). ~150x faster than morphological detection.
Ideal for rectangular DEMs from raster sources. Uses original DEM shape to
generate clean, regularly-sampled edge vertices.
use_fractional_edges (bool): Use fractional edge coordinates that preserve projection
curvature (default: True). When enabled with use_rectangle_edges, surface tier aligns
with mesh boundary (no gap) while mid/base tiers use fractional X,Y positions to follow
smooth WGS84→UTM projection curves. Creates geographically accurate curved edges that
connect seamlessly to terrain.
edge_sample_spacing (float): Pixel spacing for edge/skirt vertices (default: 0.33).
Lower values = denser skirt (smoother but more memory). 0.33 = 3x denser than
mesh edge. 1.0 = same density as mesh edge. 2.0 = half density (good for large
meshes to avoid OOM).
Returns:
bpy.types.Object | None: The created terrain mesh object, or None if creation failed.
Raises:
ValueError: If transformed DEM layer is not available (apply_transforms() not called).
Examples:
# Default two-tier edge with smooth fractional edges and red clay base
mesh = terrain.create_mesh(boundary_extension=True)
# Single-tier edge (backwards compatible)
mesh = terrain.create_mesh(two_tier_edge=False)
# Two-tier with gold base material
mesh = terrain.create_mesh(edge_base_material="gold")
# Two-tier with deeper edge (1 unit below min surface, 0.2 units below surface for mid)
mesh = terrain.create_mesh(
base_depth=1.0, # Deep base (1 unit below min surface)
edge_mid_depth=0.2 # Deeper colored edge (0.2 units below surface)
)
# Two-tier with custom RGB color
mesh = terrain.create_mesh(edge_base_material=(0.6, 0.55, 0.5))
# Disable fractional edges for simple rectangular boundary
mesh = terrain.create_mesh(use_fractional_edges=False)
"""
start_time = time.time()
self.logger.info("Creating terrain mesh...")
# Get transformed DEM data
if "dem" not in self.data_layers or not self.data_layers["dem"].get("transformed", False):
raise ValueError("Transformed DEM layer required for mesh creation")
# Note: Color computation moved to after vertex position generation
# (multi-overlay mode needs y_valid and x_valid to exist)
# Water detection - store mask for later (coloring happens after compute_colors)
_water_mask_for_coloring = None
if detect_water or water_mask is not None:
if water_mask is None:
# Compute water mask from transformed DEM
dem_data = self.data_layers["dem"]["transformed_data"]
self.logger.info(
f"Detecting water bodies (slope threshold: {water_slope_threshold})..."
)
from src.terrain.water import identify_water_by_slope
water_mask = identify_water_by_slope(
dem_data, slope_threshold=water_slope_threshold, fill_holes=True
)
else:
self.logger.info(
f"Using pre-computed water mask ({np.sum(water_mask)} water pixels)"
)
_water_mask_for_coloring = water_mask
dem_data = self.data_layers["dem"]["transformed_data"]
height, width = dem_data.shape
# Create valid points mask (non-NaN values)
valid_mask = ~np.isnan(dem_data)
# Generate vertex positions with scaling
self.logger.info("Generating vertex positions...")
from src.terrain.mesh_operations import generate_vertex_positions
positions, y_valid, x_valid = generate_vertex_positions(
dem_data, valid_mask, scale_factor, height_scale
)
# Store y_valid and x_valid as instance attributes for color computation
self.y_valid = y_valid
self.x_valid = x_valid
# Compute colors if color mapping is set and colors haven't been computed yet
# Check for any color mapping mode (standard, blended, or multi-overlay)
has_color_mapping = (
hasattr(self, "color_mapping") or
hasattr(self, "base_colormap") or
hasattr(self, "color_mapping_mode")
)
if has_color_mapping and not hasattr(self, "colors"):
self.compute_colors()
# Apply water coloring AFTER color computation (so we modify the colormap colors)
if _water_mask_for_coloring is not None and hasattr(self, "colors") and self.colors is not None:
from scipy.ndimage import distance_transform_edt
water_mask = _water_mask_for_coloring
# Validate and resample water mask to match colors shape if needed
expected_shape = self.colors.shape[:2] if self.colors.ndim == 3 else dem_data.shape
if water_mask.shape != expected_shape:
self.logger.warning(
f"Water mask shape {water_mask.shape} does not match colors shape {expected_shape}. "
f"Resampling water mask to match colors. This can happen when colors are computed from "
f"a different layer than DEM (e.g., score layers)."
)
# Resample water mask to match colors shape
zoom_y = expected_shape[0] / water_mask.shape[0]
zoom_x = expected_shape[1] / water_mask.shape[1]
water_mask = zoom(
water_mask.astype(np.float32),
zoom=(zoom_y, zoom_x),
order=0, # Nearest neighbor for boolean
prefilter=False,
).astype(np.bool_)
water_distances = distance_transform_edt(water_mask)
# Define gradient colors: blue to dark blue (Great Lakes depth)
edge_color = np.array([25, 85, 125], dtype=np.float32) # Medium blue (shore)
center_color = np.array([15, 50, 85], dtype=np.float32) # Deep blue (center)
# Map water mask (2D grid) to vertex indices
# self.colors is a 1D vertex array, not a 2D grid
water_at_vertices = water_mask[self.y_valid, self.x_valid]
water_vertex_indices = np.where(water_at_vertices)[0]
# Get raw pixel distances for water vertices using grid coordinates
water_y = self.y_valid[water_vertex_indices]
water_x = self.x_valid[water_vertex_indices]
water_pixel_distances = water_distances[water_y, water_x]
# Cartographic shoreline vignette style (vintage map aesthetic)
# Gradient only in shoreline band; interior water is uniform dark
shoreline_width_pixels = 12
# t=1 means dark (interior), t=0 means light (at shore edge)
# Start with all water as interior (dark)
t = np.ones_like(water_pixel_distances)
# Apply gradient only within shoreline band
in_shoreline_band = water_pixel_distances < shoreline_width_pixels
t[in_shoreline_band] = water_pixel_distances[in_shoreline_band] / shoreline_width_pixels
# Power curve for smoother transition
t = np.power(t, 0.5)[:, np.newaxis]
water_colors = edge_color * (1 - t) + center_color * t
# Apply water colors - handle both grid-space (H, W, 4) and vertex-space (N, 4) colors
if self.colors.ndim == 3:
# Grid-space colors - index using y, x coordinates directly
self.colors[water_y, water_x, :3] = water_colors.astype(np.uint8)
else:
# Vertex-space colors - index using vertex indices
self.colors[water_vertex_indices, :3] = water_colors.astype(np.uint8)
self.logger.info(f"Water colored with depth gradient ({np.sum(water_mask)} water pixels)")
# Center the model if requested
if center_model:
self.logger.info("Centering model at origin...")
# Calculate centroid
centroid = np.mean(positions, axis=0)
# Center horizontally (x, y) but preserve elevation (z)
positions[:, 0] -= centroid[0]
positions[:, 1] -= centroid[1]
# Store offset for later reference (camera positioning)
self.model_offset = centroid
else:
self.model_offset = np.array([0, 0, 0])
# Store model parameters for reference
self.model_params = {
"scale_factor": scale_factor,
"height_scale": height_scale,
"centered": center_model,
"offset": self.model_offset.tolist(),
"base_depth": base_depth,
"two_tier_edge": two_tier_edge,
"edge_mid_depth": edge_mid_depth,
"edge_base_material": edge_base_material,
"edge_blend_colors": edge_blend_colors,
}
# Create mapping from (y,x) coords to vertex indices - using dictionaries for O(1) lookups
self.logger.info("Creating coordinate to index mapping...")
coord_to_index = {(y, x): i for i, (y, x) in enumerate(zip(y_valid, x_valid))}
# OPTIMIZATION: Find boundary points using morphological operations
self.logger.info("Finding boundary points with optimized algorithm...")
from src.terrain.mesh_operations import find_boundary_points
boundary_coords = find_boundary_points(valid_mask)
# Only sort boundary points if needed (they're used for side faces)
boundary_winding = "counter-clockwise" # Default winding direction
if boundary_extension:
boundary_points = self._sort_boundary_points_optimized(boundary_coords)
# Determine winding direction based on boundary type
if use_rectangle_edges:
# Rectangle edge sampling always traces clockwise in image coordinates
# (top→right→bottom→left with y increasing downward)
boundary_winding = "clockwise"
self.logger.info(f"Boundary winding direction: {boundary_winding} (rectangle edges always clockwise)")
elif len(boundary_points) >= 3:
# Compute signed area to determine winding for morphological boundary
# Positive = counter-clockwise, Negative = clockwise
signed_area = 0
for i in range(len(boundary_points)):
y1, x1 = boundary_points[i]
y2, x2 = boundary_points[(i + 1) % len(boundary_points)]
signed_area += (x2 - x1) * (y2 + y1)
boundary_winding = "counter-clockwise" if signed_area < 0 else "clockwise"
self.logger.info(f"Boundary winding direction: {boundary_winding} (signed area: {signed_area:.2f})")
else:
boundary_points = boundary_coords
# OPTIMIZATION: Vectorized face generation using NumPy operations
self.logger.info("Generating faces with vectorized operations...")
from src.terrain.mesh_operations import generate_faces
faces = generate_faces(height, width, coord_to_index)
# Handle boundary extension if needed
if boundary_extension:
self.logger.info("Creating optimized boundary extension...")
from src.terrain.mesh_operations import create_boundary_extension
# Get surface colors for two-tier edge (if available and two_tier_edge enabled)
surface_colors = None
if two_tier_edge and hasattr(self, "colors") and self.colors is not None:
# Flatten colors if they're 2D grid (H, W, 3) to 1D (N, 3)
if self.colors.ndim == 3:
surface_colors = self.colors[y_valid, x_valid, :]
else:
surface_colors = self.colors
# Call create_boundary_extension with two-tier and smoothing parameters
result = create_boundary_extension(
positions,
boundary_points,
coord_to_index,
base_depth,
two_tier=two_tier_edge,
mid_depth=edge_mid_depth,
base_material=edge_base_material,
blend_edge_colors=edge_blend_colors,
surface_colors=surface_colors,
smooth_boundary=smooth_boundary,
smooth_window_size=smooth_boundary_window,
use_catmull_rom=use_catmull_rom,
catmull_rom_subdivisions=catmull_rom_subdivisions,
use_rectangle_edges=use_rectangle_edges,
terrain=self if use_rectangle_edges else None, # NEW: Pass terrain for transform-aware edges
edge_sample_spacing=edge_sample_spacing, # User-configurable density
boundary_winding=boundary_winding, # Pass winding direction for correct face normals
use_fractional_edges=use_fractional_edges, # NEW: Use fractional coords for projection curvature
scale_factor=scale_factor, # For fractional edge X,Y computation
model_offset=self.model_offset, # For fractional edge X,Y computation
)
# Handle return value (2-tuple for single-tier, 3-tuple for two-tier)
if two_tier_edge:
boundary_vertices, boundary_faces, boundary_colors = result
# Store boundary_colors separately for Blender vertex coloring
# Don't extend self.colors - it stays as 2D grid for surface vertices
# Boundary colors will be applied directly to mesh vertices after creation
self.boundary_colors = boundary_colors
else:
boundary_vertices, boundary_faces = result
self.boundary_colors = None
# Extend vertices with boundary vertices
vertices = np.vstack([positions, boundary_vertices])
# Add boundary faces to complete the mesh
faces.extend(boundary_faces)
else:
vertices = positions
# Store vertices and faces for later use (e.g., proximity calculations)
self.vertices = vertices
self.faces = faces
self.y_valid = y_valid
self.x_valid = x_valid
# Create the Blender mesh
try:
from src.terrain.blender_integration import create_blender_mesh
# Prepare colors if available
colors = self.colors if hasattr(self, "colors") else None
boundary_colors = self.boundary_colors if hasattr(self, "boundary_colors") else None
obj = create_blender_mesh(
vertices,
faces,
colors=colors,
y_valid=y_valid,
x_valid=x_valid,
boundary_colors=boundary_colors,
name="TerrainMesh",
logger=self.logger,
)
elapsed = time.time() - start_time
self.logger.info(f"Terrain mesh created successfully in {elapsed:.2f} seconds")
self.terrain_obj = obj
return obj
except Exception as e:
self.logger.error(f"Error creating terrain mesh: {str(e)}")
raise
def _sort_boundary_points_optimized(self, boundary_coords):
"""
Sort boundary points efficiently using spatial relationships.
Args:
boundary_coords: List of (y, x) coordinate tuples
Returns:
list: Sorted boundary points forming a continuous path
"""
from src.terrain.mesh_operations import sort_boundary_points
return sort_boundary_points(boundary_coords)
[docs]
def geo_to_mesh_coords(
self,
lon: np.ndarray | float,
lat: np.ndarray | float,
elevation_offset: float = 0.0,
input_crs: str = "EPSG:4326",
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Convert geographic coordinates to Blender mesh coordinates.
Transforms lon/lat points through the same pipeline used for the terrain mesh,
producing coordinates that align with the rendered terrain. Useful for placing
markers, labels, or other objects at geographic locations on the terrain.
Args:
lon: Longitude(s) or X coordinate(s). Can be a single float or array.
lat: Latitude(s) or Y coordinate(s). Can be a single float or array
(must match lon shape).
elevation_offset: Height above terrain surface in mesh units (default: 0.0).
Positive values place points above the terrain.
input_crs: CRS of input coordinates (default: "EPSG:4326" for WGS84 lon/lat).
Will be reprojected to match the transformed DEM's CRS if different.
Returns:
Tuple of (x, y, z) arrays in Blender mesh coordinates. If single values
were passed for lon/lat, returns single float values.
Raises:
RuntimeError: If create_mesh() has not been called yet (model_params not set).
ValueError: If the DEM layer has not been transformed yet.
Example:
>>> terrain = Terrain(dem, transform)
>>> terrain.add_transform(reproject_to_utm("EPSG:4326", "EPSG:32617"))
>>> terrain.apply_transforms()
>>> mesh = terrain.create_mesh()
>>> # Get mesh coords for a park at (-83.1, 42.4) in WGS84
>>> x, y, z = terrain.geo_to_mesh_coords(-83.1, 42.4, elevation_offset=0.1)
>>> # Create marker at (x, y, z) in Blender
"""
from pyproj import Transformer
# Check prerequisites
if not hasattr(self, "model_params") or self.model_params is None:
raise RuntimeError(
"create_mesh() must be called before geo_to_mesh_coords(). "
"The mesh parameters are needed for coordinate conversion."
)
dem_info = self.data_layers.get("dem", {})
if not dem_info.get("transformed", False):
raise ValueError(
"DEM layer must be transformed before geo_to_mesh_coords(). "
"Call apply_transforms() first."
)
# Get transformed DEM and its transform
dem_data = dem_info["transformed_data"]
transform = dem_info.get("transformed_transform")
dem_crs = dem_info.get("transformed_crs", "EPSG:4326")
if transform is None:
raise ValueError("No transform found for transformed DEM layer.")
# Convert to arrays for uniform handling
lon_arr = np.atleast_1d(np.asarray(lon, dtype=np.float64))
lat_arr = np.atleast_1d(np.asarray(lat, dtype=np.float64))
scalar_input = lon_arr.shape == (1,) and not hasattr(lon, "__len__")
if lon_arr.shape != lat_arr.shape:
raise ValueError(
f"lon and lat must have the same shape. Got {lon_arr.shape} and {lat_arr.shape}"
)
# Reproject coordinates if input CRS differs from DEM CRS
if input_crs != dem_crs:
self.logger.debug(f" Reprojecting {len(lon_arr)} points from {input_crs} to {dem_crs}")
transformer = Transformer.from_crs(input_crs, dem_crs, always_xy=True)
lon_arr, lat_arr = transformer.transform(lon_arr, lat_arr)
# Convert geographic coords to pixel coords using inverse transform
# Affine: (col, row) = ~transform * (x, y) where x=easting, y=northing
inv_transform = ~transform
cols, rows = inv_transform * (lon_arr, lat_arr)
# Round to nearest pixel
rows = np.round(rows).astype(int)
cols = np.round(cols).astype(int)
# Get DEM shape
height, width = dem_data.shape
# Clamp to valid range and track out-of-bounds points
valid_mask = (rows >= 0) & (rows < height) & (cols >= 0) & (cols < width)
rows_clamped = np.clip(rows, 0, height - 1)
cols_clamped = np.clip(cols, 0, width - 1)
# Get elevation values from DEM
elevations = dem_data[rows_clamped, cols_clamped]
# Handle out-of-bounds or NaN elevations
invalid_mask = ~valid_mask | np.isnan(elevations)
if np.any(invalid_mask):
# Use mean elevation for invalid points
valid_elevations = dem_data[~np.isnan(dem_data)]
mean_elev = np.mean(valid_elevations) if len(valid_elevations) > 0 else 0.0
elevations = np.where(invalid_mask, mean_elev, elevations)
self.logger.debug(
f" {np.sum(invalid_mask)} points outside DEM bounds, using mean elevation"
)
# Apply mesh scaling (same as generate_vertex_positions)
scale_factor = self.model_params["scale_factor"]
height_scale = self.model_params["height_scale"]
x = cols.astype(np.float64) / scale_factor
y = rows.astype(np.float64) / scale_factor
z = elevations * height_scale + elevation_offset
# Apply centering offset if model was centered
if self.model_params.get("centered", False):
offset = self.model_params["offset"]
x = x - offset[0]
y = y - offset[1]
# Note: z offset not applied - elevation_offset handles vertical positioning
# Return scalars if input was scalar
if scalar_input:
return float(x[0]), float(y[0]), float(z[0])
return x, y, z
[docs]
def compute_proximity_mask(
self,
lons: np.ndarray,
lats: np.ndarray,
radius_meters: float,
input_crs: str = "EPSG:4326",
cluster_threshold_meters: Optional[float] = None,
) -> np.ndarray:
"""
Create boolean mask for vertices within radius of geographic points.
Uses KDTree for efficient spatial queries to identify mesh vertices
near specified geographic locations (e.g., parks, POIs). Optionally
clusters nearby points first to create unified proximity zones.
Args:
lons: Array of longitudes for points of interest.
lats: Array of latitudes for points of interest (must match lons shape).
radius_meters: Radius in meters around each point/cluster to include.
input_crs: CRS of input coordinates (default: "EPSG:4326" for WGS84 lon/lat).
cluster_threshold_meters: Optional distance threshold for clustering nearby
points using DBSCAN. Points within this distance are merged into single
zones. If None, each point gets its own zone. Useful for merging nearby
parks into continuous zones.
Returns:
Boolean array of shape (num_vertices,) where True indicates vertex is
within radius of at least one point/cluster.
Raises:
RuntimeError: If create_mesh() has not been called yet.
ValueError: If lons and lats have different shapes.
Example:
>>> # Create zones around parks
>>> park_lons = np.array([-83.1, -83.2, -83.15])
>>> park_lats = np.array([42.4, 42.5, 42.45])
>>> mask = terrain.compute_proximity_mask(
... park_lons, park_lats,
... radius_meters=1000,
... cluster_threshold_meters=200 # Merge parks within 200m
... )
>>> # mask is True for vertices within 1km of park clusters
"""
from scipy.spatial import KDTree
# Check prerequisites
if not hasattr(self, "vertices") or self.vertices is None:
raise RuntimeError(
"create_mesh() must be called before compute_proximity_mask(). "
"Mesh vertices are needed for proximity calculations."
)
# Validate inputs
lons = np.asarray(lons)
lats = np.asarray(lats)
if lons.shape != lats.shape:
raise ValueError(f"lons and lats must have same shape. Got {lons.shape} and {lats.shape}")
# Filter out NaN coordinates (missing park locations, etc.)
valid_mask = ~(np.isnan(lons) | np.isnan(lats))
if not np.all(valid_mask):
num_invalid = np.sum(~valid_mask)
self.logger.warning(
f"Filtering out {num_invalid} points with NaN coordinates "
f"({len(lons) - num_invalid} valid points remaining)"
)
lons = lons[valid_mask]
lats = lats[valid_mask]
# Handle edge case: no valid points
if len(lons) == 0:
self.logger.warning("No valid points remaining after filtering NaN coordinates")
return np.zeros(len(self.vertices), dtype=bool)
# Convert geographic coords to mesh space (x, y only, ignore z)
xs, ys, _ = self.geo_to_mesh_coords(lons, lats, input_crs=input_crs)
point_coords = np.column_stack([xs, ys])
# Filter out points that ended up with NaN mesh coordinates (outside DEM bounds)
mesh_valid_mask = ~(np.isnan(point_coords[:, 0]) | np.isnan(point_coords[:, 1]))
if not np.all(mesh_valid_mask):
num_invalid = np.sum(~mesh_valid_mask)
self.logger.warning(
f"Filtering out {num_invalid} points outside DEM bounds "
f"({np.sum(mesh_valid_mask)} valid points remaining)"
)
point_coords = point_coords[mesh_valid_mask]
# Handle edge case: no valid points after mesh coordinate conversion
if len(point_coords) == 0:
self.logger.warning("No points within DEM bounds for proximity mask")
return np.zeros(len(self.vertices), dtype=bool)
self.logger.info(f"Computing proximity mask for {len(point_coords)} points...")
# Get pixel size from transformed DEM for metric conversions
dem_info = self.data_layers.get("dem", {})
transformed_transform = dem_info.get("transformed_transform")
if transformed_transform is None:
raise ValueError("Transformed DEM transform not found")
# Pixel size in meters (assumes metric CRS like UTM after transformation)
pixel_size_meters = abs(transformed_transform.a)
scale_factor = self.model_params["scale_factor"]
# Calculate meters per mesh unit
# 1 mesh unit = scale_factor pixels = scale_factor * pixel_size_meters
meters_per_mesh_unit = scale_factor * pixel_size_meters
self.logger.debug(
f" Pixel size: {pixel_size_meters:.2f}m, "
f"Scale factor: {scale_factor}, "
f"Meters per mesh unit: {meters_per_mesh_unit:.2f}m"
)
# Cluster nearby points using DBSCAN
if cluster_threshold_meters is not None:
from sklearn.cluster import DBSCAN
# Convert cluster threshold from meters to mesh units
cluster_threshold_mesh = cluster_threshold_meters / meters_per_mesh_unit
self.logger.info(
f" Clustering points with threshold {cluster_threshold_meters}m "
f"({cluster_threshold_mesh:.3f} mesh units)..."
)
clustering = DBSCAN(eps=cluster_threshold_mesh, min_samples=1)
labels = clustering.fit_predict(point_coords)
num_clusters = labels.max() + 1
# Use cluster centroids instead of individual points
point_coords = np.array(
[point_coords[labels == i].mean(axis=0) for i in range(num_clusters)]
)
self.logger.info(f" Clustered {len(lons)} points into {num_clusters} zones")
# Build KDTree for efficient spatial queries
tree = KDTree(point_coords)
# Get mesh vertex positions (just x, y for 2D distance)
# vertices includes boundary vertices if boundary_extension=True
mesh_verts_2d = self.vertices[:, :2]
# Convert radius from meters to mesh units
radius_mesh = radius_meters / meters_per_mesh_unit
self.logger.info(
f" Querying vertices within {radius_meters}m ({radius_mesh:.3f} mesh units) "
f"of {len(point_coords)} point(s)..."
)
# Query: which vertices are within radius of ANY point?
distances, _ = tree.query(mesh_verts_2d, k=1)
mask = distances <= radius_mesh
num_in_zone = np.sum(mask)
pct_in_zone = 100.0 * num_in_zone / len(mask)
self.logger.info(
f" Proximity mask: {num_in_zone}/{len(mask)} vertices ({pct_in_zone:.1f}%) in zones"
)
return mask
[docs]
def compute_proximity_mask_grid(
self,
lons: np.ndarray,
lats: np.ndarray,
radius_meters: float,
input_crs: str = "EPSG:4326",
cluster_threshold_meters: Optional[float] = None,
) -> np.ndarray:
"""
Create boolean mask for DEM grid cells within radius of geographic points.
Similar to compute_proximity_mask() but works directly on the transformed DEM
grid WITHOUT requiring mesh creation. Returns a 2D boolean array matching
the transformed DEM shape.
This is useful when you need to compute proximity zones before creating the
mesh, avoiding the need for duplicate mesh creation.
Args:
lons: Array of longitudes for points of interest.
lats: Array of latitudes for points of interest (must match lons shape).
radius_meters: Radius in meters around each point/cluster to include.
input_crs: CRS of input coordinates (default: "EPSG:4326" for WGS84 lon/lat).
cluster_threshold_meters: Optional distance threshold for clustering nearby
points using DBSCAN. If None, each point gets its own zone.
Returns:
Boolean array of shape (height, width) matching transformed DEM,
where True indicates grid cell is within radius of at least one point/cluster.
Raises:
ValueError: If DEM has not been transformed yet or if lons/lats mismatch.
Example:
>>> terrain = Terrain(dem, transform)
>>> terrain.add_transform(reproject_to_utm(...))
>>> terrain.apply_transforms()
>>> # Compute proximity BEFORE mesh creation
>>> park_mask = terrain.compute_proximity_mask_grid(
... park_lons, park_lats, radius_meters=1000
... )
>>> # Use mask in color computations, then create mesh once
"""
from scipy.spatial import KDTree
from pyproj import Transformer
# Check prerequisites
dem_info = self.data_layers.get("dem", {})
if not dem_info.get("transformed", False):
raise ValueError(
"DEM layer must be transformed before compute_proximity_mask_grid(). "
"Call apply_transforms() first."
)
# Validate inputs
lons = np.asarray(lons)
lats = np.asarray(lats)
if lons.shape != lats.shape:
raise ValueError(f"lons and lats must have same shape. Got {lons.shape} and {lats.shape}")
# Filter out NaN coordinates
valid_mask = ~(np.isnan(lons) | np.isnan(lats))
if not np.all(valid_mask):
num_invalid = np.sum(~valid_mask)
self.logger.warning(
f"Filtering out {num_invalid} points with NaN coordinates "
f"({len(lons) - num_invalid} valid points remaining)"
)
lons = lons[valid_mask]
lats = lats[valid_mask]
# Handle edge case: no valid points
dem_data = dem_info["transformed_data"]
if len(lons) == 0:
self.logger.warning("No valid points remaining after filtering NaN coordinates")
return np.zeros(dem_data.shape, dtype=bool)
# Get transformed DEM properties
transform = dem_info.get("transformed_transform")
dem_crs = dem_info.get("transformed_crs", "EPSG:4326")
if transform is None:
raise ValueError("No transform found for transformed DEM layer.")
# Reproject coordinates if input CRS differs from DEM CRS
if input_crs != dem_crs:
self.logger.debug(f" Reprojecting {len(lons)} points from {input_crs} to {dem_crs}")
transformer = Transformer.from_crs(input_crs, dem_crs, always_xy=True)
lons, lats = transformer.transform(lons, lats)
# Convert geographic coords to pixel coordinates
from rasterio.transform import rowcol
rows = []
cols = []
for lon, lat in zip(lons, lats):
row, col = rowcol(transform, lon, lat)
rows.append(row)
cols.append(col)
rows = np.array(rows)
cols = np.array(cols)
# Filter out points outside DEM bounds
height, width = dem_data.shape
valid_bounds = (rows >= 0) & (rows < height) & (cols >= 0) & (cols < width)
if not np.all(valid_bounds):
num_invalid = np.sum(~valid_bounds)
self.logger.warning(
f"Filtering out {num_invalid} points outside DEM bounds "
f"({np.sum(valid_bounds)} valid points remaining)"
)
rows = rows[valid_bounds]
cols = cols[valid_bounds]
# Handle edge case: no valid points after filtering
if len(rows) == 0:
self.logger.warning("No points within DEM bounds for proximity mask")
return np.zeros(dem_data.shape, dtype=bool)
self.logger.info(f"Computing grid-based proximity mask for {len(rows)} points...")
# Get pixel size in meters (assumes metric CRS like UTM after transformation)
pixel_size_meters = abs(transform.a)
# Convert radius from meters to pixels
radius_pixels = radius_meters / pixel_size_meters
# Stack point coordinates
point_coords = np.column_stack([rows, cols])
# Cluster nearby points using DBSCAN
if cluster_threshold_meters is not None:
from sklearn.cluster import DBSCAN
# Convert cluster threshold from meters to pixels
cluster_threshold_pixels = cluster_threshold_meters / pixel_size_meters
self.logger.info(
f" Clustering points with threshold {cluster_threshold_meters}m "
f"({cluster_threshold_pixels:.3f} pixels)..."
)
clustering = DBSCAN(eps=cluster_threshold_pixels, min_samples=1)
labels = clustering.fit_predict(point_coords)
num_clusters = labels.max() + 1
# Use cluster centroids instead of individual points
point_coords = np.array(
[point_coords[labels == i].mean(axis=0) for i in range(num_clusters)]
)
self.logger.info(f" Clustered {len(lons)} points into {num_clusters} zones")
# Build KDTree for efficient spatial queries
tree = KDTree(point_coords)
# Create grid of all pixel coordinates
row_indices, col_indices = np.indices(dem_data.shape)
grid_coords = np.column_stack([row_indices.ravel(), col_indices.ravel()])
# Query: which pixels are within radius of ANY point?
self.logger.info(
f" Querying {len(grid_coords)} pixels within {radius_meters}m "
f"({radius_pixels:.3f} pixels) of {len(point_coords)} point(s)..."
)
distances, _ = tree.query(grid_coords, k=1)
mask_flat = distances <= radius_pixels
# Reshape to grid
mask = mask_flat.reshape(dem_data.shape)
num_in_zone = np.sum(mask)
total_pixels = mask.size
pct_in_zone = 100.0 * num_in_zone / total_pixels
self.logger.info(
f" Proximity mask: {num_in_zone}/{total_pixels} pixels ({pct_in_zone:.1f}%) in zones"
)
return mask
[docs]
def compute_ring_mask_grid(
self,
lons: np.ndarray,
lats: np.ndarray,
inner_radius_meters: float,
outer_radius_meters: float,
input_crs: str = "EPSG:4326",
cluster_threshold_meters: Optional[float] = None,
) -> np.ndarray:
"""
Create boolean mask for ring (annulus) around geographic points.
Creates a ring-shaped mask where pixels are True if they are:
- Within outer_radius_meters of a point, AND
- Outside inner_radius_meters of all points
This is useful for creating visual outlines around areas of interest.
Args:
lons: Array of longitudes for points of interest.
lats: Array of latitudes for points of interest (must match lons shape).
inner_radius_meters: Inner radius of the ring in meters. Pixels closer
than this to any point are excluded. Use 0 for filled circle.
outer_radius_meters: Outer radius of the ring in meters. Pixels farther
than this from all points are excluded.
input_crs: CRS of input coordinates (default: "EPSG:4326" for WGS84 lon/lat).
cluster_threshold_meters: Optional distance threshold for clustering nearby
points using DBSCAN. If None, each point gets its own ring.
Returns:
Boolean array of shape (height, width) matching transformed DEM,
where True indicates grid cell is within the ring zone.
Raises:
ValueError: If inner_radius > outer_radius or if radii are negative.
Example:
>>> # Create a 100m-wide ring at 2km from each park
>>> ring_mask = terrain.compute_ring_mask_grid(
... park_lons, park_lats,
... inner_radius_meters=1900,
... outer_radius_meters=2000,
... )
"""
from scipy.spatial import KDTree
from pyproj import Transformer
# Validate radii
if inner_radius_meters < 0:
raise ValueError(f"inner_radius_meters must be >= 0, got {inner_radius_meters}")
if outer_radius_meters < 0:
raise ValueError(f"outer_radius_meters must be >= 0, got {outer_radius_meters}")
if inner_radius_meters > outer_radius_meters:
raise ValueError(
f"inner_radius_meters ({inner_radius_meters}) must be <= "
f"outer_radius_meters ({outer_radius_meters})"
)
# Check prerequisites
dem_info = self.data_layers.get("dem", {})
if not dem_info.get("transformed", False):
# For tests with mock terrain, check for _transformed_dem attribute
if hasattr(self, "_transformed_dem") and self._transformed_dem is not None:
dem_data = self._transformed_dem
transform = getattr(self, "_transformed_transform", None)
dem_crs = getattr(self, "_transformed_crs", "EPSG:4326")
else:
raise ValueError(
"DEM layer must be transformed before compute_ring_mask_grid(). "
"Call apply_transforms() first."
)
else:
dem_data = dem_info["transformed_data"]
transform = dem_info.get("transformed_transform")
dem_crs = dem_info.get("transformed_crs", "EPSG:4326")
# Validate inputs
lons = np.asarray(lons)
lats = np.asarray(lats)
if lons.shape != lats.shape:
raise ValueError(f"lons and lats must have same shape. Got {lons.shape} and {lats.shape}")
# Handle edge case: no points
if len(lons) == 0:
self.logger.debug("No points provided for ring mask - returning empty mask")
return np.zeros(dem_data.shape, dtype=bool)
# Filter out NaN coordinates
valid_mask = ~(np.isnan(lons) | np.isnan(lats))
if not np.all(valid_mask):
num_invalid = np.sum(~valid_mask)
self.logger.warning(
f"Filtering out {num_invalid} points with NaN coordinates"
)
lons = lons[valid_mask]
lats = lats[valid_mask]
if len(lons) == 0:
return np.zeros(dem_data.shape, dtype=bool)
# If no transform, use simple grid coordinates (for testing)
if transform is None:
# Assume coordinates are already in grid space (0-1 normalized)
height, width = dem_data.shape
rows = (lats * height).astype(int)
cols = (lons * width).astype(int)
pixel_size_meters = 30.0 # Assume 30m pixels for testing
else:
# Reproject coordinates if input CRS differs from DEM CRS
if input_crs != dem_crs:
self.logger.debug(f" Reprojecting points from {input_crs} to {dem_crs}")
transformer = Transformer.from_crs(input_crs, dem_crs, always_xy=True)
lons, lats = transformer.transform(lons, lats)
# Convert geographic coords to pixel coordinates
from rasterio.transform import rowcol
rows = []
cols = []
for lon, lat in zip(lons, lats):
row, col = rowcol(transform, lon, lat)
rows.append(row)
cols.append(col)
rows = np.array(rows)
cols = np.array(cols)
# Get pixel size in meters
pixel_size_meters = abs(transform.a)
# Filter out points outside DEM bounds
height, width = dem_data.shape
valid_bounds = (rows >= 0) & (rows < height) & (cols >= 0) & (cols < width)
if not np.all(valid_bounds):
rows = rows[valid_bounds]
cols = cols[valid_bounds]
if len(rows) == 0:
return np.zeros(dem_data.shape, dtype=bool)
self.logger.info(f"Computing ring mask for {len(rows)} points...")
# Convert radii from meters to pixels
inner_radius_pixels = inner_radius_meters / pixel_size_meters
outer_radius_pixels = outer_radius_meters / pixel_size_meters
# Stack point coordinates
point_coords = np.column_stack([rows, cols])
# Cluster nearby points using DBSCAN if requested
if cluster_threshold_meters is not None:
from sklearn.cluster import DBSCAN
cluster_threshold_pixels = cluster_threshold_meters / pixel_size_meters
clustering = DBSCAN(eps=cluster_threshold_pixels, min_samples=1)
labels = clustering.fit_predict(point_coords)
num_clusters = labels.max() + 1
# Use cluster centroids
point_coords = np.array(
[point_coords[labels == i].mean(axis=0) for i in range(num_clusters)]
)
self.logger.info(f" Clustered into {num_clusters} zones")
# Build KDTree for efficient spatial queries
tree = KDTree(point_coords)
# Create grid of all pixel coordinates
row_indices, col_indices = np.indices(dem_data.shape)
grid_coords = np.column_stack([row_indices.ravel(), col_indices.ravel()])
# Query distances to nearest point
distances, _ = tree.query(grid_coords, k=1)
# Create ring mask: within outer but outside inner
# Use strict inequality for inner so inner_radius=0 gives filled circle
outer_mask = distances <= outer_radius_pixels
inner_mask = distances < inner_radius_pixels # Strictly less than
ring_mask_flat = outer_mask & ~inner_mask
# Reshape to grid
ring_mask = ring_mask_flat.reshape(dem_data.shape)
num_in_ring = np.sum(ring_mask)
self.logger.info(
f" Ring mask: {num_in_ring} pixels "
f"(ring width: {outer_radius_meters - inner_radius_meters}m)"
)
return ring_mask
[docs]
def apply_ring_color(
self,
ring_mask: np.ndarray,
ring_color: tuple = (0.1, 0.1, 0.1),
) -> None:
"""
Apply a solid color to vertices within the ring mask.
This modifies self.colors in-place, setting the RGB values for vertices
that fall within the ring mask to the specified color.
Args:
ring_mask: 2D boolean array matching DEM shape. True = apply ring color.
ring_color: RGB tuple (0-1 range) for ring color. Default: dark gray.
Raises:
ValueError: If ring_mask shape doesn't match DEM or colors not initialized.
"""
if not hasattr(self, "colors") or self.colors is None:
raise ValueError("Colors must be computed before applying ring color")
if not hasattr(self, "y_valid") or not hasattr(self, "x_valid"):
raise ValueError("Vertex coordinates (y_valid, x_valid) must be set")
# Apply ring color to vertices within the mask
for i, (y, x) in enumerate(zip(self.y_valid, self.x_valid)):
# Clamp to valid indices
y_idx = int(np.clip(y, 0, ring_mask.shape[0] - 1))
x_idx = int(np.clip(x, 0, ring_mask.shape[1] - 1))
if ring_mask[y_idx, x_idx]:
self.colors[i, 0] = ring_color[0]
self.colors[i, 1] = ring_color[1]
self.colors[i, 2] = ring_color[2]
# Keep alpha unchanged