Source code for src.terrain.visualization.bounds_pipeline

"""
Transformation pipeline for bounds visualization.

Encapsulates the multi-stage transformation from WGS84 original DEM through
reprojection, flipping, and downsampling to final mesh grid.

IMPORTANT: The WGS84 → UTM transformation is NON-LINEAR (Transverse Mercator
projection involves trigonometric relationships). A rectangular boundary in
WGS84 becomes CURVED when projected to UTM. This module uses pyproj to handle
the proper projection math, not linear approximations.

This module provides reusable components for both visualization and testing.
"""

import numpy as np
from typing import Tuple, List, Optional

try:
    from pyproj import Transformer as PyProjTransformer
    HAS_PYPROJ = True
except ImportError:
    HAS_PYPROJ = False


[docs] class SimpleAffine: """Minimal affine transform for mapping between coordinate spaces. Represents the affine transform equation: x_world = c + pixel_x * a + pixel_y * b y_world = f + pixel_x * d + pixel_y * e Where (c, f) is the top-left corner, (a, e) are pixel scales, and (b, d) handle rotation/skew. """
[docs] def __init__(self, c: float, d: float, e: float, f: float, a: float, b: float): """Initialize affine transform coefficients. Args: c: X-coordinate of top-left corner (easting/longitude) d: Pixel spacing in x direction for y (usually 0 for aligned grid) e: Y-coordinate scale (usually negative for top-to-bottom scanning) f: Y-coordinate of top-left corner (northing/latitude) a: Pixel spacing in x direction (meters or degrees per pixel) b: Pixel spacing in y direction for x (usually 0 for aligned grid) """ self.c = c self.d = d self.e = e self.f = f self.a = a self.b = b
[docs] def map_pixel_to_world(self, y: int, x: int) -> Tuple[float, float]: """Map grid pixel coordinates to world coordinates. Args: y: Row index (0 = top) x: Column index (0 = left) Returns: Tuple of (world_x, world_y) coordinates """ world_x = self.c + x * self.a + y * self.b world_y = self.f + x * self.d + y * self.e return world_x, world_y
[docs] class TransformationPipeline: """Manages the transformation pipeline from WGS84 to final mesh grid. Pipeline stages: 1. Original: WGS84 coordinates, full resolution 2. Reprojected: UTM, distorted by NON-LINEAR projection (Transverse Mercator) 3. Flipped: Same shape as reprojected, but horizontally flipped 4. Final: Downsampled mesh grid (20×49 pixels for Detroit) IMPORTANT: The WGS84 → UTM transformation is non-linear. A rectangular boundary in WGS84 becomes curved in UTM space due to the Transverse Mercator projection's trigonometric relationships. """
[docs] def __init__( self, original_shape: Tuple[int, int], distortion_factor: float, target_vertices: int, bbox_wgs84: Optional[Tuple[float, float, float, float]] = None, bbox_utm: Optional[Tuple[float, float, float, float]] = None, src_crs: str = "EPSG:4326", dst_crs: str = "EPSG:32617", ): """Initialize transformation pipeline. Args: original_shape: (height, width) of original DEM in WGS84 distortion_factor: Height compression ratio from projection (e.g., 0.87) Used for shape estimation; actual projection uses pyproj target_vertices: Target number of vertices for final mesh bbox_wgs84: Optional (min_lon, min_lat, max_lon, max_lat) in WGS84 bbox_utm: Optional (min_easting, min_northing, max_easting, max_northing) in UTM src_crs: Source coordinate reference system (default: EPSG:4326 / WGS84) dst_crs: Destination coordinate reference system (default: EPSG:32617 / UTM 17N) """ self.original_shape = original_shape self.distortion_factor = distortion_factor self.target_vertices = target_vertices self.bbox_wgs84 = bbox_wgs84 self.bbox_utm = bbox_utm self.src_crs = src_crs self.dst_crs = dst_crs # Compute derived shapes self._compute_shapes()
def _compute_shapes(self) -> None: """Compute grid shapes at each transformation stage. Note: The distortion_factor is used to estimate the reprojected shape. This is an approximation - the actual WGS84→UTM transformation is non-linear and a WGS84 rectangle becomes a curved quadrilateral in UTM space. """ height, width = self.original_shape # Stage 2: Reprojection distortion (estimated) # Add 1 to ensure boundary pixels aren't filtered at exact edge self.reprojected_shape = ( int(height * self.distortion_factor) + 1, int(width / self.distortion_factor) + 1, ) # Stage 3: Flipped (shape unchanged) self.flipped_shape = self.reprojected_shape # Stage 4: Final mesh grid downsampling rep_height, rep_width = self.reprojected_shape aspect_ratio = rep_height / rep_width # Solve: height * width = target_vertices, height/width = aspect_ratio final_height = int(np.sqrt(self.target_vertices * aspect_ratio)) final_width = int(np.sqrt(self.target_vertices / aspect_ratio)) self.final_shape = (final_height, final_width) # Downsampling factors (from reprojected to final) self.downsample_factors = ( rep_height / final_height, rep_width / final_width, )
[docs] def get_shape(self, stage: str) -> Tuple[int, int]: """Get grid shape at a transformation stage. Args: stage: One of 'original', 'reprojected', 'flipped', 'final' Returns: (height, width) tuple for that stage """ if stage == 'original': return self.original_shape elif stage == 'reprojected': return self.reprojected_shape elif stage == 'flipped': return self.flipped_shape elif stage == 'final': return self.final_shape else: raise ValueError(f"Unknown stage: {stage}")
[docs] def get_affine(self, stage: str) -> SimpleAffine: """Get affine transform for a stage. Args: stage: One of 'original', 'reprojected', 'final' Returns: SimpleAffine transform for mapping pixels to world coordinates """ if stage == 'original': if self.bbox_wgs84 is None: raise ValueError("bbox_wgs84 required to compute original affine") min_lon, min_lat, max_lon, max_lat = self.bbox_wgs84 height, width = self.original_shape scale_x = (max_lon - min_lon) / width scale_y = -(max_lat - min_lat) / height return SimpleAffine( c=min_lon, f=max_lat, a=scale_x, e=scale_y, b=0, d=0 ) elif stage == 'reprojected': if self.bbox_utm is None: raise ValueError("bbox_utm required to compute reprojected affine") min_easting, min_northing, max_easting, max_northing = self.bbox_utm height, width = self.reprojected_shape scale_x = (max_easting - min_easting) / width scale_y = -(max_northing - min_northing) / height return SimpleAffine( c=min_easting, f=max_northing, a=scale_x, e=scale_y, b=0, d=0 ) else: raise ValueError(f"No affine for stage: {stage}")
[docs] class EdgeTransformer: """Transforms edge pixels through the transformation pipeline. Applies the sequence of transformations (reprojection, flip, downsample) to edge pixel coordinates. IMPORTANT: The WGS84 → UTM reprojection uses pyproj for proper non-linear Transverse Mercator projection. This means rectangular edges in WGS84 become curved in UTM space. """
[docs] def __init__(self, pipeline: TransformationPipeline, use_pyproj: bool = True): """Initialize with a transformation pipeline. Args: pipeline: TransformationPipeline instance use_pyproj: If True, use pyproj for proper non-linear projection. If False, fall back to linear approximation (for testing). """ self.pipeline = pipeline self.use_pyproj = use_pyproj and HAS_PYPROJ # Create pyproj transformer if available and enabled self._proj_transformer = None if self.use_pyproj and pipeline.bbox_wgs84 is not None: self._proj_transformer = PyProjTransformer.from_crs( pipeline.src_crs, pipeline.dst_crs, always_xy=True )
[docs] def transform_stage( self, pixels: List[Tuple[int, int]], from_stage: str, to_stage: str, ) -> List[Tuple[int, int]]: """Transform pixels from one stage to the next. Args: pixels: List of (y, x) pixel coordinates from_stage: Source stage name to_stage: Destination stage name Returns: List of (y, x) coordinates in destination stage """ # Map stage transitions to transformation methods stage_order = ['original', 'reprojected', 'flipped', 'final'] from_idx = stage_order.index(from_stage) to_idx = stage_order.index(to_stage) if from_idx >= to_idx: raise ValueError(f"Cannot transform backward: {from_stage}{to_stage}") result = pixels for i in range(from_idx, to_idx): current_stage = stage_order[i] next_stage = stage_order[i + 1] result = self._apply_single_transform(result, current_stage, next_stage) return result
def _apply_single_transform( self, pixels: List[Tuple[int, int]], from_stage: str, to_stage: str, ) -> List[Tuple[int, int]]: """Apply a single transformation step. Args: pixels: Input pixel coordinates from_stage: Source stage to_stage: Destination stage Returns: Transformed pixel coordinates """ if from_stage == 'original' and to_stage == 'reprojected': # Apply projection distortion return self._transform_original_to_reprojected(pixels) elif from_stage == 'reprojected' and to_stage == 'flipped': # Apply horizontal flip return self._transform_reprojected_to_flipped(pixels) elif from_stage == 'flipped' and to_stage == 'final': # Apply downsampling return self._transform_flipped_to_final(pixels) else: raise ValueError(f"Unknown transformation: {from_stage}{to_stage}") def _transform_original_to_reprojected(self, pixels: List[Tuple[int, int]]) -> List[Tuple[int, int]]: """Apply WGS84 → UTM projection (non-linear Transverse Mercator). If pyproj is available and bbox_wgs84/bbox_utm are provided, uses proper coordinate transformation. Otherwise falls back to linear approximation. """ # Use proper projection if available if self._proj_transformer is not None and self.pipeline.bbox_wgs84 is not None: return self._transform_with_pyproj(pixels) # Fallback: linear approximation (for backward compatibility) distortion_factor = self.pipeline.distortion_factor return [ (int(y * distortion_factor), int(x / distortion_factor)) for y, x in pixels ] def _transform_with_pyproj(self, pixels: List[Tuple[int, int]]) -> List[Tuple[int, int]]: """Transform pixels using proper coordinate projection via pyproj. Flow: pixel → WGS84 geographic → UTM geographic → pixel in reprojected grid This captures the NON-LINEAR behavior of the Transverse Mercator projection, where straight lines in WGS84 become curved in UTM. Note: A WGS84 rectangle becomes a curved quadrilateral in UTM space. Edge pixels may map to positions that don't align with the rectangular reprojected grid boundaries. """ if self.pipeline.bbox_wgs84 is None or self.pipeline.bbox_utm is None: raise ValueError("bbox_wgs84 and bbox_utm required for pyproj transformation") # Get affines for pixel ↔ geographic coordinate conversion affine_wgs84 = self.pipeline.get_affine('original') affine_utm = self.pipeline.get_affine('reprojected') rep_h, rep_w = self.pipeline.reprojected_shape result = [] for y_orig, x_orig in pixels: # 1. Pixel → WGS84 geographic coordinates lon, lat = affine_wgs84.map_pixel_to_world(y_orig, x_orig) # 2. WGS84 → UTM (non-linear Transverse Mercator projection!) easting, northing = self._proj_transformer.transform(lon, lat) # 3. UTM geographic → pixel in reprojected grid # Need to invert the UTM affine: pixel = (geo - offset) / scale x_reproj = (easting - affine_utm.c) / affine_utm.a y_reproj = (northing - affine_utm.f) / affine_utm.e # Round to integer pixel coordinates y_int, x_int = int(round(y_reproj)), int(round(x_reproj)) # Clamp to valid bounds (preserves edge pixels that land near boundary) y_int = max(0, min(rep_h - 1, y_int)) x_int = max(0, min(rep_w - 1, x_int)) result.append((y_int, x_int)) return result def _transform_with_pyproj_fractional(self, pixels: List[Tuple[int, int]]) -> List[Tuple[float, float]]: """Transform pixels using pyproj, returning FRACTIONAL coordinates without clamping. This preserves the true curved boundary from the non-linear projection, even when coordinates extend beyond the rectangular grid bounds. Returns: List of (y, x) tuples with fractional precision, unclamped. """ if self.pipeline.bbox_wgs84 is None or self.pipeline.bbox_utm is None: raise ValueError("bbox_wgs84 and bbox_utm required for pyproj transformation") affine_wgs84 = self.pipeline.get_affine('original') affine_utm = self.pipeline.get_affine('reprojected') result = [] for y_orig, x_orig in pixels: # 1. Pixel → WGS84 geographic coordinates lon, lat = affine_wgs84.map_pixel_to_world(y_orig, x_orig) # 2. WGS84 → UTM (non-linear Transverse Mercator projection!) easting, northing = self._proj_transformer.transform(lon, lat) # 3. UTM geographic → fractional pixel in reprojected grid (NO clamping) x_reproj = (easting - affine_utm.c) / affine_utm.a y_reproj = (northing - affine_utm.f) / affine_utm.e result.append((y_reproj, x_reproj)) return result def _transform_reprojected_to_flipped(self, pixels: List[Tuple[int, int]]) -> List[Tuple[int, int]]: """Apply horizontal flip (mirror left-right).""" _, width = self.pipeline.reprojected_shape return [ (y, width - 1 - x) for y, x in pixels ] def _transform_flipped_to_final(self, pixels: List[Tuple[int, int]]) -> List[Tuple[int, int]]: """Downsample to final mesh grid (integer coordinates). Uses clamping instead of filtering to preserve edge pixels that land at or slightly beyond the grid boundary (due to rounding). Note: For smooth edge extrusion, use transform_to_mesh_space() instead to get fractional coordinates. """ downsample_y, downsample_x = self.pipeline.downsample_factors final_height, final_width = self.pipeline.final_shape result = [] for y, x in pixels: # Downsample with rounding y_final = int(np.round(y / downsample_y)) x_final = int(np.round(x / downsample_x)) # Clamp to valid grid bounds (preserves boundary pixels) y_final = max(0, min(final_height - 1, y_final)) x_final = max(0, min(final_width - 1, x_final)) result.append((y_final, x_final)) # Remove duplicates created by rounding/clamping return list(set(result))
[docs] def transform_to_mesh_space( self, pixels: List[Tuple[int, int]], from_stage: str = 'original', clamp: bool = True, ) -> List[Tuple[float, float]]: """Transform edge pixels to mesh coordinate space with FRACTIONAL precision. Unlike transform_full_pipeline() which snaps to integer grid cells, this method preserves fractional coordinates for smooth edge extrusion. Args: pixels: Edge pixel coordinates (y, x) in the source stage from_stage: Source stage ('original', 'reprojected', or 'flipped') clamp: If True, clamp coordinates to [0, dim-1]. If False, return true projected coordinates which may exceed grid bounds due to non-linear projection curvature. Returns: List of (y, x) coordinates in mesh space with fractional precision. If clamp=True, values range from 0.0 to (height-1) and 0.0 to (width-1). If clamp=False, values may exceed these bounds. """ downsample_y, downsample_x = self.pipeline.downsample_factors final_height, final_width = self.pipeline.final_shape _, rep_width = self.pipeline.reprojected_shape # When clamp=False and from_stage='original', use fractional pyproj to preserve # the true curved boundary from the non-linear projection if not clamp and from_stage == 'original' and self._proj_transformer is not None: # Get fractional reprojected coordinates (preserves curvature) reproj_frac = self._transform_with_pyproj_fractional(pixels) # Apply flip (mirror x around center) flipped_frac = [(y, rep_width - 1 - x) for y, x in reproj_frac] # Scale to mesh space result = [] for y, x in flipped_frac: y_mesh = y / downsample_y x_mesh = x / downsample_x result.append((y_mesh, x_mesh)) return result # Standard path: use integer transformation (with clamping at intermediate stages) if from_stage == 'original': flipped = self.transform_stage(pixels, 'original', 'flipped') elif from_stage == 'reprojected': flipped = self.transform_stage(pixels, 'reprojected', 'flipped') elif from_stage == 'flipped': flipped = pixels else: raise ValueError(f"Unknown stage: {from_stage}") result = [] for y, x in flipped: # Scale to mesh coordinates (fractional) y_mesh = y / downsample_y x_mesh = x / downsample_x if clamp: # Clamp to valid range [0, dim-1] y_mesh = max(0.0, min(float(final_height - 1), y_mesh)) x_mesh = max(0.0, min(float(final_width - 1), x_mesh)) result.append((y_mesh, x_mesh)) return result
[docs] def transform_full_pipeline(self, pixels: List[Tuple[int, int]]) -> List[Tuple[int, int]]: """Apply full transformation from original to final. Args: pixels: Edge pixel coordinates in original WGS84 space Returns: Edge pixels in final mesh grid coordinates """ return self.transform_stage(pixels, 'original', 'final')