Source code for src.terrain.transforms

"""
Raster transformation operations for terrain processing.

This module contains functions for transforming raster data including
downsampling, smoothing, flipping, and elevation scaling.
"""

import logging
import numpy as np
from scipy.ndimage import zoom
from scipy import ndimage
from rasterio.warp import calculate_default_transform, reproject, Resampling
from rasterio import Affine
import rasterio


[docs] def downsample_raster(zoom_factor=0.1, method="average", nodata_value=np.nan): """ Create a raster downsampling transform function with specified parameters. Args: zoom_factor: Scaling factor for downsampling (default: 0.1) method: Downsampling method (default: "average") - "average": Area averaging - best for DEMs, no overshoot - "lanczos": Lanczos resampling - sharp, minimal aliasing - "cubic": Cubic spline interpolation - "bilinear": Bilinear interpolation - safe fallback nodata_value: Value to treat as no data (default: np.nan) Returns: function: A transform function that downsamples raster data """ logger = logging.getLogger(__name__) def transform(raster_data, transform=None): """ Downsample raster data using the specified method. Args: raster_data: Input raster numpy array transform: Optional affine transform Returns: tuple: (downsampled_data, new_transform, None) """ logger.info(f"Downsampling raster by factor {zoom_factor} using {method}") # Detect nodata mask (don't copy data yet) if np.isnan(nodata_value): mask = np.isnan(raster_data) else: mask = raster_data == nodata_value has_nodata = np.any(mask) # Calculate output shape out_shape = ( max(1, int(raster_data.shape[0] * zoom_factor)), max(1, int(raster_data.shape[1] * zoom_factor)), ) if method == "average": # Area averaging - best for DEMs, no upfront copy needed # PyTorch handles float conversion efficiently downsampled = _downsample_average(raster_data, out_shape, mask if has_nodata else None) elif method == "lanczos": # Lanczos resampling - need float processing # Prepare data only for this method processed_data = raster_data.astype(np.float64) if has_nodata: processed_data[mask] = np.nan downsampled = _downsample_lanczos(processed_data, out_shape) elif method == "cubic": # Cubic spline interpolation - need float processing processed_data = raster_data.astype(np.float64) if has_nodata: processed_data[mask] = np.nan downsampled = zoom( processed_data, zoom=zoom_factor, order=3, prefilter=True, mode="reflect" ) elif method == "bilinear": # Bilinear interpolation - need float processing processed_data = raster_data.astype(np.float64) if has_nodata: processed_data[mask] = np.nan downsampled = zoom( processed_data, zoom=zoom_factor, order=1, prefilter=False, mode="reflect" ) else: raise ValueError(f"Unknown downsampling method: {method}") # Restore nodata values (on much smaller downsampled data) if has_nodata and method != "average": # For non-average methods, resize mask on downsampled result (efficient) from skimage.transform import resize mask_downsampled = resize( mask.astype(float), downsampled.shape, order=0, preserve_range=True ) downsampled[mask_downsampled > 0.5] = nodata_value # Update transform if provided if transform is not None: # Scale the transform to match new resolution new_transform = transform * transform.scale(1 / zoom_factor) else: new_transform = None logger.info(f"Original shape: {raster_data.shape}") logger.info(f"Downsampled shape: {downsampled.shape}") logger.info(f"Value range: {np.nanmin(downsampled):.2f} to {np.nanmax(downsampled):.2f}") # Return 3-tuple for consistency with transform pipeline return downsampled, new_transform, None # Set descriptive name for logging transform.__name__ = f"downsample({zoom_factor:.3f}, {method})" return transform
def downsample_raster_optimized(zoom_factor=0.1, method="average", nodata_value=np.nan): """ Create an optimized raster downsampling transform using two-pass for large compression. For large compression ratios (>100:1), uses two-pass downsampling to reduce memory bandwidth and improve cache efficiency. For smaller ratios, uses single-pass. Expected speedup: ~35x for billion-pixel DEMs (79s → ~2s). Args: zoom_factor: Scaling factor for downsampling (default: 0.1) method: Downsampling method (default: "average") - "average": Area averaging - best for DEMs, no overshoot - "lanczos": Lanczos resampling - sharp, minimal aliasing - "cubic": Cubic spline interpolation - "bilinear": Bilinear interpolation - safe fallback nodata_value: Value to treat as no data (default: np.nan) Returns: function: A transform function that downsamples raster data """ logger = logging.getLogger(__name__) def transform(raster_data, transform=None): """ Downsample raster data, using two-pass for large compression ratios. Args: raster_data: Input raster numpy array transform: Optional affine transform Returns: tuple: (downsampled_data, new_transform, None) """ # Calculate compression ratio input_pixels = np.prod(raster_data.shape) output_pixels = input_pixels * (zoom_factor ** 2) compression_ratio = input_pixels / output_pixels logger.info( f"Downsampling {input_pixels:,} pixels to {output_pixels:,.0f} pixels " f"(compression {compression_ratio:.1f}:1)" ) # Use two-pass for large compression ratios (>100:1) # Intermediate resolution: ~316:1 compression in first pass if compression_ratio > 100: logger.info("Using two-pass downsampling for large compression ratio") # First pass: compress by ~316x (zoom_factor ≈ 0.316 or 1/sqrt(10)) zoom_first = 1 / np.sqrt(compression_ratio / 10) # ~0.316 for 100:1 ratio zoom_first = min(zoom_first, 0.5) # Cap at 50% to avoid too-large intermediate zoom_second = zoom_factor / zoom_first logger.info(f"Two-pass: {zoom_first:.3f}{zoom_second:.3f}") # Get individual downsample functions downsample_first = downsample_raster(zoom_first, method, nodata_value) downsample_second = downsample_raster(zoom_second, method, nodata_value) # Apply first pass data_intermediate, transform_intermediate, _ = downsample_first( raster_data, transform ) # Apply second pass result, final_transform, _ = downsample_second( data_intermediate, transform_intermediate ) return result, final_transform, None else: # Single-pass for small compression ratios logger.info("Using single-pass downsampling") downsample = downsample_raster(zoom_factor, method, nodata_value) return downsample(raster_data, transform) # Set descriptive name for logging transform.__name__ = f"downsample_opt({zoom_factor:.3f}, {method})" return transform def _downsample_average(data: np.ndarray, out_shape: tuple, mask: np.ndarray = None) -> np.ndarray: """ Downsample using area averaging (mean of source pixels in each target cell). This is the most physically accurate method for DEMs - it preserves the mean elevation and doesn't create values outside the original range. Uses PyTorch's F.interpolate with mode='area' for true area averaging. PyTorch GPU is ~1.3x faster than CPU even for 1B pixel datasets due to efficient memory layout and optimized kernels. Args: data: Input raster array (any dtype, will be converted to float) out_shape: Target output shape (height, width) mask: Optional boolean mask where True = nodata (avoided in averaging) """ import torch import torch.nn.functional as F # PyTorch requires C-contiguous arrays (no negative strides from flipud/fliplr) if not data.flags['C_CONTIGUOUS']: data = np.ascontiguousarray(data) # Convert to torch tensor: (H, W) -> (1, 1, H, W) tensor = torch.from_numpy(data).float().unsqueeze(0).unsqueeze(0) # Handle nodata masking: temporarily replace with NaN for area averaging # (PyTorch's area mode ignores NaN values in averaging) if mask is not None: tensor[0, 0, mask] = float('nan') # Use GPU if available (faster even for very large datasets) device = "cuda" if torch.cuda.is_available() else "cpu" tensor = tensor.to(device) # Area interpolation - true area averaging, O(n) regardless of scale result = F.interpolate(tensor, size=out_shape, mode="area") # Convert back to numpy downsampled = result.squeeze().cpu().numpy() # Restore nodata on downsampled result if mask was provided # Much faster: resize small downsampled mask instead of large original mask if mask is not None: from skimage.transform import resize mask_downsampled = resize( mask.astype(float), downsampled.shape, order=0, preserve_range=True ) downsampled[mask_downsampled > 0.5] = np.nan return downsampled def _downsample_lanczos(data: np.ndarray, out_shape: tuple) -> np.ndarray: """ Downsample using Lanczos resampling (sharp with good anti-aliasing). Lanczos provides sharper results than area averaging while still handling aliasing well. Good for preserving terrain detail. """ try: # Try PIL/Pillow first (best Lanczos implementation) from PIL import Image # Handle NaN by temporarily replacing with mean nan_mask = np.isnan(data) if np.any(nan_mask): data_filled = data.copy() data_filled[nan_mask] = np.nanmean(data) else: data_filled = data # Convert to PIL Image img = Image.fromarray(data_filled.astype(np.float32)) # Resize with Lanczos resized = img.resize((out_shape[1], out_shape[0]), Image.Resampling.LANCZOS) result = np.array(resized, dtype=np.float64) # Restore NaN if np.any(nan_mask): mask_resized = np.array( Image.fromarray(nan_mask.astype(np.float32)).resize( (out_shape[1], out_shape[0]), Image.Resampling.NEAREST ) ) result[mask_resized > 0.5] = np.nan return result except ImportError: # Fall back to skimage from skimage.transform import resize return resize( data, out_shape, order=3, # Cubic as approximation anti_aliasing=True, preserve_range=True, ) def smooth_raster(window_size=None, nodata_value=np.nan): """ Create a raster smoothing transform function with specified parameters. Args: window_size: Size of median filter window (defaults to 5% of smallest dimension if None) nodata_value: Value to treat as no data (default: np.nan) Returns: function: A transform function that smooths raster data """ logger = logging.getLogger(__name__) def transform(raster_data, transform=None): """ Apply median smoothing to raster data Args: raster_data: Input raster numpy array transform: Optional affine transform (unchanged by smoothing) Returns: tuple: (smoothed_data, transform, None) """ logger.info("Applying median smoothing to raster") # Create a mask for nodata values mask = raster_data == nodata_value # Prepare data for smoothing processed_data = raster_data.copy() processed_data[mask] = np.nan # Calculate window size if not provided actual_window_size = window_size if actual_window_size is None: actual_window_size = int(np.floor(np.min(processed_data.shape) * 0.05)) logger.info(f"Using calculated window size: {actual_window_size}") # Apply median filter smoothed = ndimage.median_filter(processed_data, size=actual_window_size) # Restore nodata values smoothed[mask] = nodata_value logger.info(f"Smoothing window size: {actual_window_size}") logger.info( f"Value range before smoothing: {np.nanmin(processed_data):.2f} to {np.nanmax(processed_data):.2f}" ) logger.info( f"Value range after smoothing: {np.nanmin(smoothed):.2f} to {np.nanmax(smoothed):.2f}" ) # Transform is unchanged by smoothing, return 3-tuple for consistency return smoothed, transform, None # Set descriptive name for logging transform.__name__ = f"median_smooth(w={window_size or 'auto'})" return transform
[docs] def flip_raster(axis="horizontal"): """ Create a transform function that mirrors (flips) the DEM data. If axis='horizontal', it flips top ↔ bottom. (In terms of rows, row=0 becomes row=(height-1).) If axis='vertical', you could do left ↔ right (np.fliplr). """ def transform_func(data, transform=None): """Flip array along specified axis and update transform if provided.""" # 1) Flip the array in pixel space if axis == "horizontal": # Top <-> bottom new_data = np.flipud(data) flip_code = "horizontal" elif axis == "vertical": # Left <-> right new_data = np.fliplr(data) flip_code = "vertical" else: raise ValueError("axis must be 'horizontal' or 'vertical'.") if transform is None: # No transform to update, just return return (new_data, None, None) # 2) Original array shape old_height, old_width = data.shape # 3) If we keep the same shape after flip new_height, new_width = new_data.shape assert ( new_height == old_height and new_width == old_width ), "Flip changed array size unexpectedly!" # 4) Update the affine transform # Typical georeferenced transform looks like: # Affine(a, b, xoff, # d, e, yoff) # For a north-up raster with no rotation: # a = pixel_width, e = -pixel_height, b=d=0 # xoff, yoff = top-left corner in world coords # # Flipping top ↔ bottom effectively inverts rows: # new_row = (height-1) - old_row # # That means: # new_yoff = old_yoff + (height-1)*e # new_e = -old_e (so that row moves in the opposite direction) # GDAL order is: (xoff, a, b, yoff, d, e) # which maps to Affine(a, b, c, d, e, f) as: c=xoff, f=yoff xoff, a, b, yoff, d, e = transform.to_gdal() if flip_code == "horizontal": # top <-> bottom flip => invert "row" direction new_e = -e new_yoff = yoff + (old_height - 1) * e # Everything else remains the same new_transform = Affine(a, b, xoff, d, new_e, new_yoff) elif flip_code == "vertical": # left <-> right flip => invert "col" direction new_a = -a new_xoff = xoff + (old_width - 1) * a # Others remain the same new_transform = Affine(new_a, b, new_xoff, d, e, yoff) return (new_data, new_transform, None) # Set descriptive name for logging transform_func.__name__ = f"flip_{axis}" return transform_func
[docs] def scale_elevation(scale_factor=1.0, nodata_value=np.nan): """ Create a raster elevation scaling transform function. Multiplies all elevation values by the scale factor. Useful for reducing or amplifying terrain height without changing horizontal scale. Args: scale_factor (float): Multiplication factor for elevation values (default: 1.0) nodata_value: Value to treat as no data (default: np.nan) Returns: function: A transform function that scales elevation data """ logger = logging.getLogger(__name__) def transform(raster_data, transform=None): """ Scale elevation values in raster data. Args: raster_data: Input raster numpy array transform: Optional affine transform (unchanged by scaling) Returns: tuple: (scaled_data, transform, None) """ logger.info(f"Scaling elevation by factor {scale_factor}") # Create output array scaled_data = raster_data.copy() # Mask out nodata values mask = raster_data == nodata_value # Scale the valid data scaled_data[~mask] = raster_data[~mask] * scale_factor # Preserve nodata values scaled_data[mask] = nodata_value logger.info(f"Original range: {np.nanmin(raster_data):.2f} to {np.nanmax(raster_data):.2f}") logger.info(f"Scaled range: {np.nanmin(scaled_data):.2f} to {np.nanmax(scaled_data):.2f}") # Transform is unchanged by scaling (affects Z only) return scaled_data, transform, None # Set descriptive name for logging transform.__name__ = f"scale_elev(×{scale_factor})" # Store metadata for algorithms that need to know the scale factor # This allows slope_adaptive_smooth to compensate for prior scaling transform._elevation_scale_factor = scale_factor return transform
[docs] def reproject_raster(src_crs="EPSG:4326", dst_crs="EPSG:32617", nodata_value=np.nan, num_threads=4): """ Generalized raster reprojection function Args: src_crs: Source coordinate reference system dst_crs: Destination coordinate reference system nodata_value: Value to use for areas outside original data num_threads: Number of threads for parallel processing Returns: Function that transforms data and returns (data, transform, new_crs) """ def _reproject_raster(src_data, src_transform): logger = logging.getLogger(__name__) logger.info(f"Reprojecting raster from {src_crs} to {dst_crs}") with rasterio.Env(GDAL_NUM_THREADS=str(num_threads)): # Calculate transform and dimensions for destination CRS dst_transform, width, height = calculate_default_transform( src_crs, dst_crs, src_data.shape[1], src_data.shape[0], *rasterio.transform.array_bounds( src_data.shape[0], src_data.shape[1], src_transform ), ) # Create destination array dst_data = np.full((height, width), nodata_value, dtype=src_data.dtype) # Reproject with bilinear interpolation reproject( source=src_data, destination=dst_data, src_transform=src_transform, src_crs=src_crs, dst_transform=dst_transform, dst_crs=dst_crs, resampling=Resampling.bilinear, dst_nodata=nodata_value, num_threads=num_threads if num_threads > 0 else None, warp_mem_limit=2048, # Increased from 512MB to 2GB for better performance ) logger.info(f"Reprojection complete. New shape: {dst_data.shape}") logger.info(f"Value range: {np.nanmin(dst_data):.2f} to {np.nanmax(dst_data):.2f}") # Return all three values return dst_data, dst_transform, dst_crs # Set descriptive name for logging _reproject_raster.__name__ = f"reproject({src_crs}{dst_crs})" return _reproject_raster
def cached_reproject( src_crs="EPSG:4326", dst_crs="EPSG:32617", cache_dir="data/cache/reprojected", nodata_value=np.nan, num_threads=0, # 0 = auto-detect CPU cores (much faster on multi-core systems) ): """ Cached raster reprojection - saves reprojected DEM to disk for instant reuse. First call computes and caches the reprojection. Subsequent calls load from cache (~0.5s). Cache is keyed by CRS pair and source data hash. Optimizations: - num_threads=0 (auto-detect): Uses all available CPU cores (much faster) - warp_mem_limit=2048MB: Increased from 512MB for better tiling and fewer passes Args: src_crs: Source coordinate reference system dst_crs: Destination coordinate reference system cache_dir: Directory to store cached reprojections nodata_value: Value to use for areas outside original data num_threads: Number of GDAL threads (0=auto-detect, 4 is default GDAL) Returns: Function that transforms data and returns (data, transform, new_crs) Example: >>> # First run: computes and caches >>> terrain.add_transform(cached_reproject(src_crs="EPSG:4326", dst_crs="EPSG:32617")) >>> # Second run: ~0.5s (loads from cache) """ from pathlib import Path import hashlib cache_path = Path(cache_dir) cache_path.mkdir(parents=True, exist_ok=True) def _cached_reproject(src_data, src_transform): logger = logging.getLogger(__name__) # Create cache key from data shape, transform, and CRS data_hash = hashlib.md5( f"{src_data.shape}_{src_transform}_{src_crs}_{dst_crs}".encode() ).hexdigest()[:12] cache_file = cache_path / f"reproject_{src_crs.replace(':', '_')}_to_{dst_crs.replace(':', '_')}_{data_hash}.npz" meta_file = cache_path / f"reproject_{src_crs.replace(':', '_')}_to_{dst_crs.replace(':', '_')}_{data_hash}_meta.npz" # Try to load from cache if cache_file.exists() and meta_file.exists(): logger.info(f"Loading cached reprojection from {cache_file.name}") cached = np.load(cache_file) meta = np.load(meta_file, allow_pickle=True) dst_data = cached["data"] dst_transform = Affine(*meta["transform"]) logger.info(f" Loaded {dst_data.shape} in cache hit (instant)") return dst_data, dst_transform, dst_crs # Cache miss - compute reprojection logger.info(f"Cache miss - reprojecting {src_crs}{dst_crs} (will cache result)") with rasterio.Env(GDAL_NUM_THREADS=str(num_threads)): # Calculate transform and dimensions for destination CRS dst_transform, width, height = calculate_default_transform( src_crs, dst_crs, src_data.shape[1], src_data.shape[0], *rasterio.transform.array_bounds( src_data.shape[0], src_data.shape[1], src_transform ), ) # Create destination array dst_data = np.full((height, width), nodata_value, dtype=src_data.dtype) # Reproject with bilinear interpolation reproject( source=src_data, destination=dst_data, src_transform=src_transform, src_crs=src_crs, dst_transform=dst_transform, dst_crs=dst_crs, resampling=Resampling.bilinear, dst_nodata=nodata_value, num_threads=num_threads if num_threads > 0 else None, warp_mem_limit=2048, # Increased from 512MB to 2GB for better performance ) # Save to cache logger.info(f"Saving reprojection to cache: {cache_file.name}") np.savez_compressed(cache_file, data=dst_data) np.savez(meta_file, transform=list(dst_transform)[:6]) logger.info(f"Reprojection complete. New shape: {dst_data.shape}") return dst_data, dst_transform, dst_crs _cached_reproject.__name__ = f"cached_reproject({src_crs}{dst_crs})" return _cached_reproject def downsample_then_reproject( src_crs="EPSG:4326", dst_crs="EPSG:32617", downsample_zoom_factor=0.1, downsample_method="average", nodata_value=np.nan, num_threads=0, ): """ Optimized combined downsampling and reprojection transform. This combines downsampling and reprojection into a single operation to avoid large intermediate arrays. Downsamples FIRST in the source CRS, then reprojects the smaller result. This is especially beneficial for very large DEMs (855M+ pixels). Expected benefit: ~40-50 seconds saved for 855M pixel DEMs (avoids reprojecting the full-resolution data). Args: src_crs: Source coordinate reference system dst_crs: Destination coordinate reference system downsample_zoom_factor: Downsampling factor (e.g., 0.1 for 10% resolution) downsample_method: Downsampling method ("average", "lanczos", "cubic", "bilinear") nodata_value: Value to use for areas outside original data num_threads: Number of GDAL threads (0=auto-detect) Returns: Function that transforms data and returns (data, transform, new_crs) """ def _downsample_then_reproject(src_data, src_transform): logger = logging.getLogger(__name__) # Step 1: Downsample in original CRS (avoids full-resolution reproject) logger.info(f"Downsampling {src_data.shape} before reprojection...") downsample_func = downsample_raster_optimized( zoom_factor=downsample_zoom_factor, method=downsample_method, nodata_value=nodata_value ) downsampled_data, downsampled_transform, _ = downsample_func(src_data, src_transform) logger.info(f" → Downsampled to {downsampled_data.shape}") # Step 2: Reproject the downsampled data (much faster than full-resolution!) logger.info(f"Reprojecting downsampled data {src_crs}{dst_crs}...") # Calculate transform for destination CRS bounds = rasterio.transform.array_bounds( downsampled_data.shape[0], downsampled_data.shape[1], downsampled_transform ) dst_transform, width, height = calculate_default_transform( src_crs, dst_crs, downsampled_data.shape[1], downsampled_data.shape[0], *bounds, ) # Create destination array dst_data = np.full((height, width), nodata_value, dtype=downsampled_data.dtype) # Reproject the downsampled data (not the original!) reproject( source=downsampled_data, destination=dst_data, src_transform=downsampled_transform, src_crs=src_crs, dst_transform=dst_transform, dst_crs=dst_crs, resampling=Resampling.bilinear, src_nodata=nodata_value, dst_nodata=nodata_value, num_threads=4, # Fixed value instead of variable ) logger.info(f"Reprojection complete. Result shape: {dst_data.shape}") return dst_data, dst_transform, dst_crs _downsample_then_reproject.__name__ = f"downsample_then_reproject({src_crs}{dst_crs}, zoom={downsample_zoom_factor})" return _downsample_then_reproject
[docs] def feature_preserving_smooth(sigma_spatial=3.0, sigma_intensity=None, nodata_value=np.nan): """ Create a feature-preserving smoothing transform using bilateral filtering. Removes high-frequency noise while preserving ridges, valleys, and drainage patterns. Uses bilateral filtering: spatial Gaussian weighted by intensity similarity. Args: sigma_spatial: Spatial smoothing extent in pixels (default: 3.0). Larger = more smoothing. Typical range: 1-10 pixels. sigma_intensity: Intensity similarity threshold in elevation units. Larger = more smoothing across elevation differences. If None, auto-calculated as 5% of elevation range. nodata_value: Value to treat as no data (default: np.nan) Returns: function: Transform function compatible with terrain.add_transform() """ logger = logging.getLogger(__name__) def transform(raster_data, transform=None): """ Apply bilateral filtering to raster data. Args: raster_data: Input raster numpy array transform: Optional affine transform (unchanged by smoothing) Returns: tuple: (smoothed_data, transform, None) """ logger.info(f"Applying feature-preserving smoothing (sigma_spatial={sigma_spatial})") # Create a mask for nodata values if np.isnan(nodata_value): mask = np.isnan(raster_data) else: mask = raster_data == nodata_value # Prepare data for smoothing (work with float64 for precision) processed_data = raster_data.astype(np.float64).copy() processed_data[mask] = np.nan # Calculate intensity sigma if not provided (5% of elevation range) actual_sigma_intensity = sigma_intensity if actual_sigma_intensity is None: valid_data = processed_data[~mask] if len(valid_data) > 0: elev_range = np.nanmax(valid_data) - np.nanmin(valid_data) actual_sigma_intensity = max(elev_range * 0.05, 1.0) else: actual_sigma_intensity = 10.0 logger.info(f"Auto-calculated sigma_intensity: {actual_sigma_intensity:.2f}") # Apply bilateral filter smoothed = _bilateral_filter_2d( processed_data, sigma_spatial, actual_sigma_intensity, mask ) # Restore nodata values smoothed[mask] = nodata_value logger.info( f"Value range before: {np.nanmin(processed_data):.2f} to {np.nanmax(processed_data):.2f}" ) logger.info( f"Value range after: {np.nanmin(smoothed):.2f} to {np.nanmax(smoothed):.2f}" ) # Transform is unchanged by smoothing, return 3-tuple for consistency return smoothed.astype(raster_data.dtype), transform, None # Set descriptive name for logging transform.__name__ = f"bilateral_smooth(σ={sigma_spatial})" return transform
def _bilateral_filter_2d(data, sigma_spatial, sigma_intensity, mask): """ Apply bilateral filtering to 2D elevation data. Implementation priority (fastest first): 1. OpenCV (~250M pixels/sec) - highly optimized SIMD implementation 2. skimage (~0.8M pixels/sec) - good C implementation 3. Pure numpy fallback (~1K pixels/sec) - last resort Args: data: 2D numpy array of elevation values sigma_spatial: Spatial Gaussian sigma in pixels sigma_intensity: Intensity Gaussian sigma in elevation units mask: Boolean mask where True = nodata Returns: Filtered 2D array """ import time logger = logging.getLogger(__name__) height, width = data.shape n_pixels = height * width # Sanity check: cap sigma_spatial to avoid memory issues # OpenCV kernel diameter ≈ 6*sigma, so sigma=100 → ~600px kernel → 360K comparisons/pixel MAX_SIGMA_SPATIAL = 15.0 # Practical limit for bilateral filter if sigma_spatial > MAX_SIGMA_SPATIAL: logger.warning( f"sigma_spatial={sigma_spatial} is very large (max recommended: {MAX_SIGMA_SPATIAL}). " f"Capping to {MAX_SIGMA_SPATIAL} to avoid memory issues. " f"For stronger smoothing, apply multiple passes or downsample first." ) sigma_spatial = MAX_SIGMA_SPATIAL logger.info(f"Bilateral filter: {height}×{width} = {n_pixels/1e6:.1f}M pixels") start_time = time.time() # Fill NaN values temporarily (neither OpenCV nor skimage handle NaN) data_filled = data.copy().astype(np.float32) if np.any(mask): valid_median = np.nanmedian(data[~mask]) data_filled[mask] = valid_median # Try OpenCV first (500x faster than skimage) try: import cv2 logger.info(" Using OpenCV (highly optimized)...") # OpenCV bilateral: d=-1 means auto-calculate diameter from sigmaSpace # Use explicit diameter for better control: d = ceil(6*sigma) | 1 (ensure odd) diameter = int(np.ceil(6 * sigma_spatial)) | 1 logger.info(f" Kernel diameter: {diameter}px") result = cv2.bilateralFilter( data_filled, d=diameter, sigmaColor=float(sigma_intensity), sigmaSpace=float(sigma_spatial), ) # Restore NaN values result = result.astype(np.float64) result[mask] = np.nan elapsed = time.time() - start_time throughput = n_pixels / elapsed / 1e6 logger.info(f" ✓ Bilateral filter complete in {elapsed:.2f}s ({throughput:.1f}M pixels/sec)") return result except ImportError: pass # Fall through to skimage except cv2.error as e: logger.warning(f" OpenCV bilateral filter failed: {e}") logger.warning(" Falling back to skimage...") except MemoryError: logger.warning(" OpenCV ran out of memory, falling back to skimage...") # Try skimage as fallback try: from skimage.restoration import denoise_bilateral logger.info(" Using skimage (install opencv-python-headless for 500x speedup)...") result = denoise_bilateral( data_filled.astype(np.float64), sigma_color=sigma_intensity, sigma_spatial=sigma_spatial, mode="reflect", channel_axis=None, ) result[mask] = np.nan elapsed = time.time() - start_time throughput = n_pixels / elapsed / 1e6 logger.info(f" ✓ Bilateral filter complete in {elapsed:.1f}s ({throughput:.1f}M pixels/sec)") return result except ImportError: logger.warning( " Neither OpenCV nor skimage available, using slow fallback. " "Install opencv-python-headless for 500x speedup." ) return _bilateral_filter_2d_fallback( data, sigma_spatial, sigma_intensity, mask, start_time ) def _bilateral_filter_2d_fallback(data, sigma_spatial, sigma_intensity, mask, start_time=None): """ Pure-numpy bilateral filter fallback (slow but works without dependencies). This is O(H * W * K^2) and will be slow for large images. """ import time logger = logging.getLogger(__name__) height, width = data.shape total_pixels = height * width if start_time is None: start_time = time.time() # Estimate: ~1000 pixels/sec for fallback estimated_time = total_pixels / 1000 logger.warning( f" ⚠ Slow fallback: {height}×{width} = {total_pixels:,} pixels. " f"Estimated ~{estimated_time/60:.0f} minutes!" ) # Determine kernel size (3 * sigma covers 99.7% of Gaussian) kernel_radius = int(np.ceil(3 * sigma_spatial)) kernel_size = 2 * kernel_radius + 1 # Create spatial weight kernel (constant, compute once) y_offsets, x_offsets = np.ogrid[ -kernel_radius : kernel_radius + 1, -kernel_radius : kernel_radius + 1 ] spatial_weights = np.exp(-(x_offsets**2 + y_offsets**2) / (2 * sigma_spatial**2)) # Pad data for boundary handling (reflect mode preserves edge values) padded = np.pad(data, kernel_radius, mode="reflect") padded_mask = np.pad(mask, kernel_radius, mode="constant", constant_values=True) # Output array result = np.zeros_like(data) # Process with progress logging and ETA log_interval = max(1, height // 10) for i in range(height): if i > 0 and i % log_interval == 0: elapsed = time.time() - start_time rows_per_sec = i / elapsed remaining_rows = height - i eta = remaining_rows / rows_per_sec if rows_per_sec > 0 else 0 logger.info( f" Progress: {i}/{height} rows ({100*i/height:.0f}%) - " f"elapsed {elapsed:.0f}s, ETA {eta:.0f}s" ) for j in range(width): if mask[i, j]: result[i, j] = np.nan continue # Extract neighborhood neighborhood = padded[i : i + kernel_size, j : j + kernel_size] neighborhood_mask = padded_mask[i : i + kernel_size, j : j + kernel_size] # Center value center_value = data[i, j] # Intensity weights based on difference from center intensity_diff = neighborhood - center_value intensity_weights = np.exp( -(intensity_diff**2) / (2 * sigma_intensity**2) ) # Combined weights (spatial * intensity) combined_weights = spatial_weights * intensity_weights combined_weights[neighborhood_mask] = 0 combined_weights[np.isnan(neighborhood)] = 0 # Weighted average weight_sum = np.sum(combined_weights) if weight_sum > 0: valid_neighborhood = np.where( np.isnan(neighborhood), 0, neighborhood ) result[i, j] = np.sum(valid_neighborhood * combined_weights) / weight_sum else: result[i, j] = center_value elapsed = time.time() - start_time throughput = total_pixels / elapsed logger.info( f" ✓ Bilateral filter complete in {elapsed:.1f}s " f"({throughput:.0f} pixels/sec)" ) return result
[docs] def smooth_score_data( scores: np.ndarray, sigma_spatial: float = 3.0, sigma_intensity: float = None, ) -> np.ndarray: """ Smooth score data using bilateral filtering. Applies feature-preserving smoothing to reduce blocky pixelation from low-resolution source data (e.g., SNODAS ~925m) when displayed on high-resolution terrain (~30m DEM). Uses bilateral filtering: smooths within similar-intensity regions while preserving edges between different score zones. Args: scores: 2D numpy array of score values (typically 0-1 range) sigma_spatial: Spatial smoothing extent in pixels (default: 3.0). Larger = more smoothing. Typical range: 1-10 pixels. sigma_intensity: Intensity similarity threshold in score units. Larger = more smoothing across score differences. If None, auto-calculated as 15% of score range (good for 0-1 data). Returns: Smoothed score array with same shape as input. NaN values are preserved. Output is clipped to [0, 1] range. Example: >>> # Smooth blocky SNODAS-derived scores >>> sledding_scores = load_score_grid("sledding_scores.npz") >>> smoothed = smooth_score_data(sledding_scores, sigma_spatial=5.0) """ logger = logging.getLogger(__name__) # Create mask for NaN values mask = np.isnan(scores) # Calculate sigma_intensity if not provided # For score data (0-1 range), use ~15% of range for good edge preservation if sigma_intensity is None: valid_data = scores[~mask] if len(valid_data) > 0: score_range = np.nanmax(valid_data) - np.nanmin(valid_data) sigma_intensity = max(score_range * 0.15, 0.05) # At least 0.05 else: sigma_intensity = 0.15 # Default for 0-1 range logger.debug(f"Auto-calculated sigma_intensity: {sigma_intensity:.3f}") logger.info( f"Smoothing score data (sigma_spatial={sigma_spatial}, sigma_intensity={sigma_intensity:.3f})" ) # Apply bilateral filter smoothed = _bilateral_filter_2d( scores.astype(np.float64), sigma_spatial=sigma_spatial, sigma_intensity=sigma_intensity, mask=mask, ) # Clip to valid score range [0, 1] (bilateral can slightly exceed bounds) smoothed = np.clip(smoothed, 0.0, 1.0) # Restore NaN values smoothed[mask] = np.nan return smoothed
[docs] def despeckle_scores( scores: np.ndarray, kernel_size: int = 3, ) -> np.ndarray: """ Remove isolated speckles from score data using median filtering (GPU-accelerated). Unlike bilateral filtering which preserves edges, median filtering replaces each pixel with the median of its neighborhood. This effectively removes isolated outlier pixels (speckles) while preserving larger regions. Uses PyTorch GPU acceleration when available (5-10x speedup on CUDA). Use case: SNODAS snow data upsampled to high-res DEM often has isolated low-score pixels (speckles) in otherwise high-score regions due to resolution mismatch. These appear as visual noise in the rendered terrain. Args: scores: 2D array of score values (typically 0-1 range) kernel_size: Size of median filter kernel (default: 3 for 3x3). Larger kernels remove larger speckle clusters but may affect legitimate small features. Common values: 3, 5, 7. Returns: Despeckled score array with same shape as input. NaN values are preserved. Example: >>> # Remove single-pixel speckles >>> despeckled = despeckle_scores(scores, kernel_size=3) >>> # Remove up to 2x2 speckle clusters >>> despeckled = despeckle_scores(scores, kernel_size=5) """ from src.terrain.gpu_ops import gpu_median_filter logger = logging.getLogger(__name__) if scores.ndim != 2: raise ValueError(f"scores must be 2D, got {scores.ndim}D") # Handle NaN values - replace with 0 temporarily for filtering mask = np.isnan(scores) scores_filled = scores.copy() scores_filled[mask] = 0.0 logger.info(f"Despeckle score data (kernel_size={kernel_size})") # Apply median filter (GPU-accelerated) despeckled = gpu_median_filter(scores_filled.astype(np.float32), kernel_size=kernel_size) # Clip to valid range (median should preserve range but ensure it) despeckled = np.clip(despeckled, 0.0, 1.0) # Restore NaN values despeckled[mask] = np.nan return despeckled.astype(np.float32)
[docs] def wavelet_denoise_dem( nodata_value=np.nan, wavelet: str = "db4", levels: int = 3, threshold_sigma: float = 2.0, preserve_structure: bool = True, ): """ Create a transform that removes noise while preserving terrain structure. Uses wavelet decomposition to separate terrain features (ridges, valleys, drainage patterns) from high-frequency noise. This is smarter than median filtering because it understands that terrain has structure at certain spatial frequencies. How it works: 1. Decompose DEM into frequency bands using wavelets 2. Estimate noise level from finest (highest-frequency) band 3. Apply soft thresholding to remove coefficients below noise threshold 4. Reconstruct DEM from cleaned coefficients The result preserves terrain structure while removing sensor noise, SRTM artifacts, and other high-frequency disturbances. Args: nodata_value: Value to treat as no data (default: np.nan) wavelet: Wavelet type (default: "db4" - Daubechies 4). Options: "db4" (smooth), "haar" (sharp edges), "sym4" (symmetric). - "db4": Best for natural terrain (smooth transitions) - "haar": Best for urban/artificial structures - "sym4": Good balance, symmetric filtering levels: Decomposition levels (default: 3). More levels = coarser structure preserved. Each level halves the resolution. - 2: Preserves finer detail, removes less noise - 3: Good balance (recommended) - 4: Aggressive smoothing, may blur small features threshold_sigma: Noise threshold multiplier (default: 2.0). Higher = more aggressive denoising. - 1.5: Light denoising, preserves more detail - 2.0: Standard denoising (recommended) - 3.0: Aggressive, may remove subtle features preserve_structure: If True, only denoise highest-frequency band to maximize structure preservation. If False, denoise all bands. Returns: function: A transform function for use with terrain.add_transform() Example: >>> # Standard terrain denoising >>> terrain.add_transform(wavelet_denoise_dem()) >>> # Aggressive denoising for very noisy DEM >>> terrain.add_transform(wavelet_denoise_dem(threshold_sigma=3.0, levels=4)) >>> # Light denoising, preserve maximum detail >>> terrain.add_transform(wavelet_denoise_dem(threshold_sigma=1.5, levels=2)) """ logger = logging.getLogger(__name__) def transform(raster_data, transform=None): """Apply wavelet denoising to DEM.""" try: import pywt except ImportError: logger.warning( "PyWavelets (pywt) not installed. Install with: pip install PyWavelets" ) logger.warning("Falling back to median filter despeckle") # Fallback to simple median filter from scipy.ndimage import median_filter return median_filter(raster_data, size=3), transform, None logger.info( f"Wavelet denoising DEM (wavelet={wavelet}, levels={levels}, " f"sigma={threshold_sigma}, preserve_structure={preserve_structure})" ) # Handle nodata if np.isnan(nodata_value): mask = np.isnan(raster_data) else: mask = raster_data == nodata_value # Check if we have enough valid data to denoise valid_count = np.sum(~mask) total_count = mask.size valid_ratio = valid_count / total_count if valid_ratio < 0.1: logger.warning( f"Only {100*valid_ratio:.1f}% valid data ({valid_count}/{total_count} pixels). " "Skipping wavelet denoising." ) return raster_data.copy(), transform, None logger.info(f" Valid data: {100*valid_ratio:.1f}% ({valid_count} pixels)") # Fill nodata with local median for processing data = raster_data.copy().astype(np.float64) if np.any(mask): from scipy.ndimage import median_filter # Use valid data mean as fallback for areas where median is also NaN valid_mean = np.nanmean(raster_data) filled = median_filter(np.nan_to_num(data, nan=valid_mean), size=5) data[mask] = filled[mask] # Wavelet decomposition coeffs = pywt.wavedec2(data, wavelet, level=levels) # Estimate noise from finest detail coefficients (HH band) # MAD (median absolute deviation) is robust noise estimator detail_coeffs = coeffs[-1] # Finest level: (cH, cV, cD) hh = detail_coeffs[2] # Diagonal detail (usually noisiest) sigma = np.median(np.abs(hh)) / 0.6745 # MAD to sigma conversion logger.info(f" Estimated noise sigma: {sigma:.4f}") # Apply soft thresholding threshold = threshold_sigma * sigma def soft_threshold(c, thresh): """Soft thresholding: shrink coefficients toward zero.""" return np.sign(c) * np.maximum(np.abs(c) - thresh, 0) # Threshold detail coefficients denoised_coeffs = [coeffs[0]] # Keep approximation (coarsest level) for i, (cH, cV, cD) in enumerate(coeffs[1:]): if preserve_structure and i < len(coeffs) - 2: # Only denoise finest level(s), preserve coarser structure denoised_coeffs.append((cH, cV, cD)) else: # Apply thresholding denoised_coeffs.append(( soft_threshold(cH, threshold), soft_threshold(cV, threshold), soft_threshold(cD, threshold), )) # Reconstruct denoised = pywt.waverec2(denoised_coeffs, wavelet) # Trim to original size (wavelet reconstruction may add padding) denoised = denoised[:raster_data.shape[0], :raster_data.shape[1]] # Restore nodata denoised[mask] = nodata_value logger.info( f" Value range: {np.nanmin(raster_data):.2f}{np.nanmin(denoised):.2f} to " f"{np.nanmax(raster_data):.2f}{np.nanmax(denoised):.2f}" ) return denoised.astype(raster_data.dtype), transform, None transform.__name__ = f"wavelet_denoise(w={wavelet},L={levels},σ={threshold_sigma})" return transform
[docs] def despeckle_dem(nodata_value=np.nan, kernel_size: int = 3): """ Create a transform that removes isolated elevation noise using median filtering (GPU-accelerated). Unlike bilateral smoothing (--smooth) which preserves edges but can look patchy, median filtering uniformly removes local outliers/speckles across the entire DEM. This is better for removing sensor noise or small DEM artifacts. Uses PyTorch GPU acceleration when available (5-10x speedup on CUDA). For smarter frequency-aware denoising that preserves terrain structure, use wavelet_denoise_dem() instead. Args: nodata_value: Value to treat as no data (default: np.nan) kernel_size: Size of median filter kernel (default: 3 for 3x3). Must be odd integer ≥3. Larger = more smoothing. - 3: Removes single-pixel noise (recommended) - 5: Removes 2x2 artifacts - 7: Stronger smoothing, may affect small terrain features Returns: function: A transform function for use with terrain.add_transform() Example: >>> terrain.add_transform(despeckle_dem(kernel_size=3)) """ logger = logging.getLogger(__name__) def transform(raster_data, transform=None): """ Apply median filter to DEM to remove isolated speckles/noise. Args: raster_data: Input raster numpy array (elevation data) transform: Optional affine transform (unchanged by filtering) Returns: tuple: (despeckled_data, transform, None) """ from src.terrain.gpu_ops import gpu_median_filter logger.info(f"Despeckle DEM (kernel_size={kernel_size})") # Create mask for nodata values if np.isnan(nodata_value): mask = np.isnan(raster_data) else: mask = raster_data == nodata_value # Fill nodata temporarily with median for filtering data_filled = raster_data.copy().astype(np.float32) if np.any(mask): valid_median = np.nanmedian(raster_data[~mask]) data_filled[mask] = valid_median # Apply median filter (GPU-accelerated) despeckled = gpu_median_filter(data_filled, kernel_size=kernel_size) # Restore nodata values despeckled[mask] = nodata_value logger.info( f"Value range before: {np.nanmin(raster_data):.2f} to {np.nanmax(raster_data):.2f}" ) logger.info( f"Value range after: {np.nanmin(despeckled):.2f} to {np.nanmax(despeckled):.2f}" ) return despeckled.astype(raster_data.dtype), transform, None # Set descriptive name for logging transform.__name__ = f"despeckle_dem(k={kernel_size})" return transform
[docs] def remove_bumps(kernel_size: int = 3, structure: str = "disk", strength: float = 1.0): """ Remove local maxima (bumps) from DEM using morphological opening. Morphological opening = erosion followed by dilation. This operation: - Removes small bright features (buildings, trees, noise) - Never creates new local maxima (mathematically guaranteed) - Preserves larger terrain features and overall shape - Leaves valleys and depressions untouched This is the standard approach for "removing buildings from DEMs" in geospatial processing. Args: kernel_size: Size of the structuring element (default: 3). Controls the maximum size of bumps to remove: - 1: Removes features up to ~2 pixels across (very subtle) - 3: Removes features up to ~6 pixels across - 5: Removes features up to ~10 pixels across For 30m DEMs, size=3 removes ~180m features structure: Shape of structuring element (default: "disk"). - "disk": Circular, isotropic (recommended) - "square": Faster but may create artifacts on diagonals strength: Blend factor between original and opened result (default: 1.0). - 0.0: No effect (returns original) - 0.5: Half the bump removal effect (subtle) - 1.0: Full bump removal (original behavior) Values between 0 and 1 provide fine-grained control. Returns: function: A transform function for use with terrain.add_transform() Example: >>> # Remove small bumps (buildings on 30m DEM) >>> terrain.add_transform(remove_bumps(kernel_size=3)) >>> # Subtle bump reduction (50% strength) >>> terrain.add_transform(remove_bumps(kernel_size=1, strength=0.5)) >>> # More aggressive bump removal >>> terrain.add_transform(remove_bumps(kernel_size=5)) """ logger = logging.getLogger(__name__) def transform(raster_data, affine_transform=None): from scipy.ndimage import grey_opening from skimage.morphology import disk, square strength_str = f", strength={strength}" if strength < 1.0 else "" logger.info(f"Removing bumps via morphological opening (kernel={kernel_size}, structure={structure}{strength_str})") # Handle nodata if np.issubdtype(raster_data.dtype, np.floating): mask = np.isnan(raster_data) nodata_value = np.nan else: # For integer types, use min value as nodata indicator mask = np.zeros(raster_data.dtype, dtype=bool) nodata_value = None data = raster_data.astype(np.float64).copy() # Fill nodata temporarily with local median if np.any(mask): valid_median = np.nanmedian(data[~mask]) data[mask] = valid_median # Create structuring element if structure == "disk": selem = disk(kernel_size) else: selem = square(kernel_size) # Morphological opening: erosion then dilation # - Erosion shrinks bright regions, removing small peaks # - Dilation restores shape, but removed peaks stay removed opened = grey_opening(data, footprint=selem) # Apply strength blending: result = original * (1-strength) + opened * strength # This gives continuous control from no effect (0) to full effect (1) if strength < 1.0: result = data * (1.0 - strength) + opened * strength else: result = opened # Compute statistics (based on actual effect after blending) diff = data - result bumps_removed = np.sum(diff > 0.1) # Pixels lowered by >0.1m max_reduction = np.max(diff) mean_reduction = np.mean(diff[diff > 0]) if np.any(diff > 0) else 0 logger.info(f" Bumps affected: {bumps_removed:,} pixels") logger.info(f" Max height reduction: {max_reduction:.2f}m") logger.info(f" Mean reduction (where applied): {mean_reduction:.2f}m") # Restore nodata if nodata_value is not None and np.any(mask): result[mask] = nodata_value return result.astype(raster_data.dtype), affine_transform, None transform.__name__ = f"remove_bumps(k={kernel_size},{structure},s={strength})" return transform
[docs] def slope_adaptive_smooth( slope_threshold: float = 2.0, smooth_sigma: float = 5.0, transition_width: float = 1.0, nodata_value=np.nan, elevation_scale: float = 1.0, edge_threshold: float | None = None, edge_window: int = 5, strength: float = 1.0, ): """ Create a transform that smooths flat areas more aggressively than hilly areas. This addresses the problem of buildings/structures appearing as bumps in flat regions. Flat areas get strong Gaussian smoothing to remove these artifacts, while slopes and hills are preserved with minimal smoothing. How it works: 1. Compute local slope at each pixel using gradient magnitude 2. Create a smooth weight mask: 1.0 where flat, 0.0 where steep 3. Apply Gaussian blur to entire DEM 4. Blend: output = original * (1-weight) + smoothed * weight The transition from "flat" to "steep" is smooth (using sigmoid) to avoid visible boundaries in the output. Args: slope_threshold: Slope angle in degrees below which terrain is considered "flat" (default: 2.0 degrees). - 1.0°: Very aggressive, only smooths nearly horizontal areas - 2.0°: Good default, smooths typical flat areas with buildings - 5.0°: Smooths gentle slopes too smooth_sigma: Gaussian blur sigma in pixels (default: 5.0). Controls the strength of smoothing in flat areas. - 3.0: Light smoothing - 5.0: Moderate smoothing (recommended) - 10.0: Very strong smoothing, may blur valid terrain features transition_width: Width of transition zone in degrees (default: 1.0). Controls how quickly smoothing fades off above threshold. - 0.5: Sharp transition - 1.0: Smooth transition (recommended) - 2.0: Very gradual transition nodata_value: Value to treat as no data (default: np.nan) elevation_scale: Scale factor that was applied to elevation data (default: 1.0). If elevation was scaled (e.g., by scale_elevation(0.0001)), pass that factor here so slope computation uses real-world elevation differences. The gradient is divided by this factor to recover true slopes. edge_threshold: Elevation difference threshold for edge preservation (default: None). If set, sharp elevation discontinuities (like lake boundaries) are preserved. Areas where local elevation range exceeds this threshold are not smoothed. - None: Disabled (original behavior) - 5.0: Preserve edges with >5m elevation change - 10.0: Only preserve very sharp edges (>10m change) Recommended: 3-10m depending on terrain features to preserve. edge_window: Window size for edge detection (default: 5). Larger windows detect edges over broader areas but may over-protect. strength: Overall smoothing strength multiplier (default: 1.0). Scales the maximum smoothing effect in flat areas. - 1.0: Full smoothing (original behavior) - 0.5: Half the smoothing effect - 0.25: Gentle smoothing - 0.0: No smoothing (transform has no effect) Returns: function: A transform function for use with terrain.add_transform() Example: >>> # Standard: smooth flat areas with >2° slopes preserved >>> terrain.add_transform(slope_adaptive_smooth()) >>> # Aggressive: smooth anything below 5° slope >>> terrain.add_transform(slope_adaptive_smooth(slope_threshold=5.0, smooth_sigma=8.0)) >>> # Conservative: only smooth very flat areas, light blur >>> terrain.add_transform(slope_adaptive_smooth(slope_threshold=1.0, smooth_sigma=3.0)) >>> # Compensate for prior scale_elevation(0.0001) >>> terrain.add_transform(slope_adaptive_smooth(elevation_scale=0.0001)) >>> # Preserve lake boundaries and other sharp edges >>> terrain.add_transform(slope_adaptive_smooth(edge_threshold=5.0)) >>> # Gentle smoothing (25% of full effect) >>> terrain.add_transform(slope_adaptive_smooth(strength=0.25)) """ logger = logging.getLogger(__name__) def transform(raster_data, affine_transform=None): """ Apply slope-adaptive smoothing to DEM. Args: raster_data: Input raster numpy array (elevation data) affine_transform: Optional affine transform (unchanged by smoothing) Returns: tuple: (smoothed_data, affine_transform, None) """ from scipy.ndimage import gaussian_filter strength_str = f", strength={strength}" if strength != 1.0 else "" logger.info( f"Slope-adaptive smoothing (threshold={slope_threshold}°, " f"sigma={smooth_sigma}, transition={transition_width}°{strength_str})" ) # Handle nodata mask if np.isnan(nodata_value): mask = np.isnan(raster_data) else: mask = raster_data == nodata_value total_valid = np.sum(~mask) # Work with float64 for precision data = raster_data.astype(np.float64).copy() # Fill nodata temporarily for gradient calculation if np.any(mask): valid_median = np.nanmedian(raster_data[~mask]) data[mask] = valid_median # Compute slope magnitude using Sobel gradients # Sobel gives dz/dx and dz/dy in elevation units per pixel dy = ndimage.sobel(data, axis=0, mode="reflect") / 8.0 # Sobel normalizes by 8 dx = ndimage.sobel(data, axis=1, mode="reflect") / 8.0 # Account for pixel size to get actual slope # Affine transform stores pixel dimensions: a=x_size, e=y_size (usually negative) if affine_transform is not None: pixel_size_x = abs(affine_transform.a) pixel_size_y = abs(affine_transform.e) # Scale gradients from per-pixel to per-meter dx = dx / pixel_size_x dy = dy / pixel_size_y logger.info(f" Pixel size: {pixel_size_x:.1f}m x {pixel_size_y:.1f}m") else: logger.warning(" No affine transform - assuming 1m pixels (slopes may be wrong)") # Compensate for elevation scaling (if elevation was scaled by 0.0001, gradients are too) # Divide by elevation_scale to recover true gradient magnitude if elevation_scale != 1.0: dx = dx / elevation_scale dy = dy / elevation_scale logger.info(f" Compensating for elevation_scale={elevation_scale}") # Gradient magnitude (rise/run in elevation per meter) gradient_magnitude = np.sqrt(dx**2 + dy**2) # Convert to slope angle in degrees slope_degrees = np.degrees(np.arctan(gradient_magnitude)) logger.info( f" Slope range: {np.nanmin(slope_degrees):.2f}° to " f"{np.nanmax(slope_degrees):.2f}°" ) logger.info( f" Median slope: {np.nanmedian(slope_degrees):.2f}°" ) # Create smooth weight mask using sigmoid # weight = 1.0 where flat (slope < threshold), 0.0 where steep # Sigmoid: 1 / (1 + exp(k * (x - threshold))) # k controls steepness of transition k = 4.0 / transition_width # Scale factor for desired transition width smoothing_weight = 1.0 / (1.0 + np.exp(k * (slope_degrees - slope_threshold))) # Edge preservation: detect sharp elevation discontinuities and protect them if edge_threshold is not None: from scipy.ndimage import maximum_filter, minimum_filter # Compute local elevation range (max - min in window) # This detects sharp edges like lake boundaries regardless of slope local_max = maximum_filter(data, size=edge_window, mode="reflect") local_min = minimum_filter(data, size=edge_window, mode="reflect") local_range = local_max - local_min # Account for elevation scaling if applied if elevation_scale != 1.0: # local_range is in scaled units, threshold is in real units effective_threshold = edge_threshold * elevation_scale else: effective_threshold = edge_threshold # Create edge mask: 1.0 near edges, 0.0 elsewhere # Use sigmoid for smooth transition edge_k = 4.0 / (effective_threshold * 0.5) # Transition over half the threshold edge_weight = 1.0 / (1.0 + np.exp(-edge_k * (local_range - effective_threshold))) # Reduce smoothing weight near edges # edge_weight=1 means edge detected → smoothing_weight should be 0 smoothing_weight = smoothing_weight * (1.0 - edge_weight) edge_pixels = np.sum(edge_weight > 0.5) logger.info( f" Edge preservation: {100*edge_pixels/total_valid:.1f}% pixels near edges " f"(threshold={edge_threshold}m, window={edge_window})" ) # Log weight distribution flat_pixels = np.sum(smoothing_weight > 0.9) steep_pixels = np.sum(smoothing_weight < 0.1) logger.info( f" Pixels: {100*flat_pixels/total_valid:.1f}% flat (weight>0.9), " f"{100*steep_pixels/total_valid:.1f}% steep (weight<0.1)" ) # Apply Gaussian smoothing to entire DEM smoothed = gaussian_filter(data, sigma=smooth_sigma, mode="reflect") # Apply strength multiplier to smoothing weight # strength=1.0: full effect, strength=0.5: half effect, strength=0.0: no effect if strength != 1.0: smoothing_weight = smoothing_weight * strength logger.info(f" Strength={strength}: max weight scaled to {np.max(smoothing_weight):.2f}") # Blend: output = original * (1-weight) + smoothed * weight result = data * (1.0 - smoothing_weight) + smoothed * smoothing_weight # Restore nodata values result[mask] = nodata_value logger.info( f" Value range before: {np.nanmin(raster_data):.2f} to " f"{np.nanmax(raster_data):.2f}" ) logger.info( f" Value range after: {np.nanmin(result):.2f} to " f"{np.nanmax(result):.2f}" ) return result.astype(raster_data.dtype), affine_transform, None # Set descriptive name for logging edge_str = f",edge={edge_threshold}m" if edge_threshold is not None else "" strength_str = f",s={strength}" if strength != 1.0 else "" transform.__name__ = ( f"slope_adaptive_smooth(θ={slope_threshold}°,σ={smooth_sigma}{edge_str}{strength_str})" ) return transform
[docs] def upscale_scores( scores: np.ndarray, scale: int = 4, method: str = "auto", nodata_value: float = np.nan, ) -> np.ndarray: """ Upscale score grid to reduce blockiness when applied to terrain. Uses AI super-resolution (Real-ESRGAN) when available, falling back to bilateral upscaling for edge-preserving smoothness. Args: scores: Input score grid (2D numpy array) scale: Upscaling factor (default: 4, meaning 4x resolution) method: Upscaling method: - "auto": Try Real-ESRGAN, fall back to bilateral - "esrgan": Use Real-ESRGAN (requires optional realesrgan package) - "bilateral": Use bilateral filter upscaling (no extra dependencies) - "bicubic": Simple bicubic interpolation nodata_value: Value treated as no data (default: np.nan) Returns: Upscaled score grid with smoother gradients Note: The "esrgan" method requires the optional ``realesrgan`` package:: pip install realesrgan Or install terrain-maker with the upscale extra:: pip install terrain-maker[upscale] Without it, "auto" will fall back to "bilateral" which produces good results without ML dependencies. Example: >>> scores_hires = upscale_scores(sledding_scores, scale=4) >>> # Now scores_hires is 4x the resolution with smoother transitions >>> # Force bilateral method (no ML dependencies) >>> scores_hires = upscale_scores(scores, scale=4, method="bilateral") """ logger = logging.getLogger(__name__) logger.info(f"Upscaling scores {scores.shape} by {scale}x using method='{method}'") # Handle nodata mask if np.isnan(nodata_value): mask = np.isnan(scores) else: mask = scores == nodata_value # Fill nodata with nearest valid values for upscaling data = scores.copy() if np.any(mask): from scipy.ndimage import distance_transform_edt # Fill nodata with nearest neighbor indices = distance_transform_edt(mask, return_distances=False, return_indices=True) data = data[tuple(indices)] # Normalize to 0-1 for processing data_min, data_max = np.nanmin(data), np.nanmax(data) if data_max > data_min: normalized = (data - data_min) / (data_max - data_min) else: normalized = np.zeros_like(data) # Try methods in order of preference: quality → speed if method == "auto": # ESRGAN (best quality) → bilinear (fast, good quality) → nearest (fastest fallback) methods_to_try = ["esrgan", "bilinear", "nearest"] else: methods_to_try = [method] result = None used_method = None for m in methods_to_try: try: if m == "esrgan": result = _upscale_esrgan(normalized, scale) used_method = "esrgan" break elif m == "bilateral": result = _upscale_bilateral(normalized, scale) used_method = "bilateral" break elif m == "bicubic": result = zoom(normalized, scale, order=3, mode="reflect") used_method = "bicubic" break elif m == "bilinear": # Fast upscaling with bilinear interpolation result = zoom(normalized, scale, order=1, mode="reflect") used_method = "bilinear" break elif m == "nearest": # Fastest upscaling with nearest neighbor result = zoom(normalized, scale, order=0, mode="constant") used_method = "nearest" break except ImportError as e: # Log at INFO level so user can see why ESRGAN isn't working logger.info(f"Method '{m}' not available: {e}") logger.info(f" Falling back to next method...") continue except Exception as e: logger.warning(f"Method '{m}' failed: {e}") logger.warning(f" Falling back to next method...") continue if result is None: # Last resort: fastest possible (nearest neighbor) logger.warning("All upscaling methods failed, using fastest fallback (nearest neighbor)") result = zoom(normalized, scale, order=0, mode="constant") used_method = "nearest" # Denormalize back to original range result = result * (data_max - data_min) + data_min # Clamp to original range (interpolation can overshoot) result = np.clip(result, data_min, data_max) # Restore nodata mask (upscaled) if np.any(mask): mask_upscaled = zoom(mask.astype(np.float32), scale, order=0, mode="constant") > 0.5 if np.isnan(nodata_value): result[mask_upscaled] = np.nan else: result[mask_upscaled] = nodata_value logger.info(f"Upscaled scores: {scores.shape} -> {result.shape} using {used_method}") return result.astype(scores.dtype)
def _upscale_esrgan(normalized: np.ndarray, scale: int) -> np.ndarray: """Upscale using Real-ESRGAN neural network. For large scales (>4x), uses multi-step upscaling by chaining multiple ESRGAN passes (e.g., 32x = 4x × 4x × 2x). """ logger = logging.getLogger(__name__) try: from realesrgan import RealESRGANer from basicsr.archs.rrdbnet_arch import RRDBNet import torch except ImportError as e: raise ImportError( f"Real-ESRGAN import failed: {e}\n" f"Install with: uv sync --extra upscale" ) logger.info("Using Real-ESRGAN for AI upscaling...") # Determine device device = torch.device("cuda" if torch.cuda.is_available() else "cpu") logger.info(f" Running on: {device}") # For large scales, chain multiple ESRGAN passes # RealESRGAN-x4plus pre-trained model only supports 4x # Break down: 32x = 4x × 4x × 2x, 16x = 4x × 4x, 8x = 4x × 2x if scale > 4: # Decompose scale into chain of 4x and 2x passes import math num_4x_passes = 0 remaining_scale = scale # Use as many 4x passes as possible while remaining_scale >= 4 and remaining_scale % 4 == 0: num_4x_passes += 1 remaining_scale //= 4 # Handle remaining scale (should be 1, 2, or odd number) if remaining_scale > 1 and remaining_scale != 2: # For odd scales or scales not power-of-2, log warning and round logger.warning( f"Scale {scale} not evenly divisible by 4. " f"Using {num_4x_passes} passes of 4x + 1 pass of {remaining_scale}x" ) passes = [(4, num_4x_passes)] if remaining_scale == 2: passes.append((2, 1)) elif remaining_scale > 2: passes.append((remaining_scale, 1)) logger.info(f" Multi-step upscaling for {scale}x: " + " × ".join([f"{s}x" for s, n in passes for _ in range(n)])) # Apply each pass current = normalized try: for step_scale, num_passes in passes: for pass_idx in range(num_passes): current = _upscale_esrgan_single(current, step_scale, device) logger.info(f" ✓ Pass {pass_idx + 1}: {step_scale}x → shape {current.shape}") except Exception as e: logger.error(f" Multi-step ESRGAN failed at pass {pass_idx + 1} (scale={step_scale}x): {e}") raise return current else: # Single pass for 2x or 4x return _upscale_esrgan_single(normalized, scale, device) def _upscale_esrgan_single(normalized: np.ndarray, scale: int, device) -> np.ndarray: """Single-pass ESRGAN upscaling (supports 2x or 4x only).""" logger = logging.getLogger(__name__) from realesrgan import RealESRGANer from basicsr.archs.rrdbnet_arch import RRDBNet # Convert to uint8 image (ESRGAN expects 0-255) img_uint8 = (normalized * 255).astype(np.uint8) # ESRGAN expects HWC format with 3 channels img_rgb = np.stack([img_uint8, img_uint8, img_uint8], axis=-1) # Initialize model with matching scale model = RRDBNet( num_in_ch=3, num_out_ch=3, num_feat=64, num_block=23, num_grow_ch=32, scale=scale # Must match pre-trained weights (2 or 4) ) # Use default Real-ESRGAN pre-trained models from GitHub releases # Download to weights directory if not present import os from pathlib import Path weights_dir = Path.home() / '.cache' / 'realesrgan' / 'weights' weights_dir.mkdir(parents=True, exist_ok=True) if scale == 4: model_filename = 'RealESRGAN_x4plus.pth' model_url = 'https://github.com/xinntao/Real-ESRGAN/releases/download/v0.1.0/RealESRGAN_x4plus.pth' elif scale == 2: model_filename = 'RealESRGAN_x2plus.pth' model_url = 'https://github.com/xinntao/Real-ESRGAN/releases/download/v0.2.1/RealESRGAN_x2plus.pth' else: raise ValueError(f"Scale {scale} not supported by Real-ESRGAN (only 2x and 4x)") model_path = weights_dir / model_filename # Download if not present if not model_path.exists(): logger.info(f" Downloading {model_filename} (~67MB, one-time)...") import urllib.request urllib.request.urlretrieve(model_url, model_path) logger.info(f" ✓ Downloaded to {model_path}") upsampler = RealESRGANer( scale=scale, model_path=str(model_path), model=model, tile=400, # Process in tiles to save memory tile_pad=10, pre_pad=0, half=device.type == "cuda", # Use half precision on GPU device=device, ) # Run upscaling output, _ = upsampler.enhance(img_rgb, outscale=scale) # Convert back to single channel float result = output[:, :, 0].astype(np.float32) / 255.0 return result def _upscale_bilateral(normalized: np.ndarray, scale: int) -> np.ndarray: """Upscale using bicubic + bilateral filter for edge-preserving smoothness.""" logger = logging.getLogger(__name__) logger.info("Using bilateral upscaling (edge-preserving)...") # First, bicubic upscale upscaled = zoom(normalized, scale, order=3, mode="reflect") # Then apply bilateral filter to smooth while preserving edges try: from skimage.restoration import denoise_bilateral # sigma_spatial controls smoothing extent # sigma_color controls how much color/value difference matters result = denoise_bilateral( upscaled, sigma_color=0.1, sigma_spatial=scale * 1.5, # Scale with upscaling factor channel_axis=None, # Grayscale ) logger.info(f" Applied bilateral filter (sigma_spatial={scale * 1.5})") except ImportError: logger.warning("scikit-image not available, using Gaussian smoothing") from scipy.ndimage import gaussian_filter result = gaussian_filter(upscaled, sigma=scale * 0.5) return result