Source code for GaugePredict.predict

# -*- coding: utf-8 -*-
"""
Provides neural network models and prediction functions for forecasting downstream
gauge conditions. Includes support for training, inference, and SHAP explainability.

Features:
    - Hybrid neural network models for discharge prediction
    - Model training and validation utilities
    - Batch prediction capabilities
    - SHAP value computation for model interpretability
    - PyTorch-based deep learning infrastructure
"""

import json
import os
import pickle
import platform
import time
from pathlib import Path

import numpy as np
import pandas as pd
import psutil
import shap

try:
    import torch
    from torch import nn
    from torch.utils.data import DataLoader, Dataset
    TORCH_AVAILABLE = True
except ImportError:
    torch = None
    nn = None
    DataLoader = None
    Dataset = None
    TORCH_AVAILABLE = False

from sklearn.metrics import r2_score
from sklearn.preprocessing import StandardScaler

from .downloader import load_target
from .routines import (
    build_target_dates,
    generate_full_index,
    generate_sequences,
    generate_train_test_masks,
    load_data,
    load_target_csv,
    process_data,
)


# =============================================================================
# Utils
# =============================================================================
[docs] def count_parameters(model): """ Count the number of trainable parameters in PyTorch model. **Inputs** : model : 'torch.nn.Module' Model instance. **Outputs** : n_params : 'int' Total number of parameters with requires_grad=True. """ return int(sum(p.numel() for p in model.parameters() if p.requires_grad))
[docs] def get_hardware_info(device_str): """ Collect a hardware and software snapshot for reporting and reproducibility. The returned dictionary is intended for logging. If CUDA is requested and available, GPU properties are also included. **Inputs** : device_str : 'str' Requested device identifier (commonly "cpu" or "cuda"). **Outputs** : info : 'dict' Dictionary containing OS, Python, Torch, CPU, RAM, and optional GPU info. """ info = { "device": str(device_str), "system": platform.system(), "machine": platform.machine(), "processor": platform.processor(), "python_version": platform.python_version(), "torch_version": getattr(torch, "__version__", None), "cpu_count": os.cpu_count(), "ram_gb": psutil.virtual_memory().total / 1024.0**3, } if TORCH_AVAILABLE and str(device_str) == "cuda" and torch.cuda.is_available(): props = torch.cuda.get_device_properties(0) info["gpu_name"] = props.name info["gpu_count"] = torch.cuda.device_count() info["gpu_memory_total_gb"] = props.total_memory / 1024.0**3 return info
# ============================================================================= # Dataset wrapper # =============================================================================
[docs] class SequenceDataset(object): """ Minimal torch.utils.data.Dataset for sequence-to-one regression. Expects: - X: [N, T, C] float-like array - y: [N, 1] or [N] float-like array Stores X and y as float32 tensors and exposes `input_channels` for model consumption. """
[docs] def __init__(self, X, y): """ **Inputs** : X : 'array-like' Feature tensor with shape [N, T, C]. y : 'array-like' Target array with shape [N] or [N, 1]. **Outputs** : dataset : 'SequenceDataset' Dataset containing float32 tensors X and y. """ self._X = np.asarray(X, dtype=np.float32) self._y = np.asarray(y, dtype=np.float32) self.input_channels = int(self._X.shape[2]) if TORCH_AVAILABLE: try: self.X = torch.tensor(self._X, dtype=torch.float32) self.y = torch.tensor(self._y, dtype=torch.float32) except Exception: self.X = self._X self.y = self._y else: self.X = self._X self.y = self._y
def __len__(self): """ Return dataset length. **Outputs** : n : 'int' Number of samples (N). """ return int(self._X.shape[0]) def __getitem__(self, idx): """ Fetch one sample. **Inputs** : idx : 'int' Sample index. **Outputs** : X_i, y_i : 'tuple(torch.Tensor, torch.Tensor)' Feature sequence [T, C] and target [1] (or scalar-shaped tensor). """ x = self.X[idx] y = self.y[idx] return x, y
# ============================================================================= # Data Handling and Model Setup # =============================================================================
[docs] class GaugeDataModel: """ Prepare arrays, splits, scalers, and PyTorch DataLoaders for forecasting. Future updates will allow for CNN and LSTM separation and layer number configuration in notebooks Framework: - Builds a full daily time index for the modeling window. - Loads predictor channels (multiple gauge sites) and a single target series. - Converts predictors + target to aligned arrays, then constructs sliding sequences for supervised learning. - Splits sequences into train/test using a cutoff date. - Standardizes predictors per channel using training statistics. - Standardizes target using sklearn.StandardScaler. Key shapes: - processed_X_data: [C, T] - X_seq: [N, L, C] (L = sequence_length) - y_seq: [N, 1] or [N] """
[docs] def __init__( self, data_files, target_site, start_date, end_date, tz, sequence_length, forecast_horizon, cutoff_date, parameter_code, *, batch_size=64, allowed_site_ids_norm=None, target_csv_path=None, target_csv_date_col=None, target_csv_value_col=None, target_units="metric", target_parameter_kind=None, ): """ Configures data pipeline but does not load data yet. """ self.data_files = data_files self.target_site = target_site self.start_date = start_date self.end_date = end_date self.tz = tz self.sequence_length = int(sequence_length) self.forecast_horizon = int(forecast_horizon) self.cutoff_date = cutoff_date self.parameter_code = parameter_code self.batch_size = int(batch_size) self.full_index = generate_full_index(start_date, end_date, localize=True, tz=tz) if allowed_site_ids_norm: self.allowed_site_ids_norm = {str(s).lstrip("0") for s in allowed_site_ids_norm} else: self.allowed_site_ids_norm = None self.target_csv_path = Path(target_csv_path) if target_csv_path else None self.target_csv_date_col = target_csv_date_col self.target_csv_value_col = target_csv_value_col self.target_units = target_units self.target_parameter_kind = target_parameter_kind self.site_meta_df = None
[docs] def prepare_data(self): """ Load predictors and target and build sequences. Framework: - Loads raw predictor arrays and per-channel metadata via routines.load_data() - Loads the target series via CSV or NWIS - Runs routines.process_data() to align and transform predictors - Creates supervised sequences X_seq and y_seq via routines.generate_sequences() - Builds a metadata table for predictor channels (site id, lat/lon, channel index) **Inputs** : None (uses instance configuration) **Outputs** : None (populates instance attributes: processed_X_data, X_seq, y_seq, y, site_meta_df) """ raw_X_data, site_meta = load_data( self.data_files, self.full_index, allow_site_ids_norm=self.allowed_site_ids_norm, tz=self.tz, ) if self.target_csv_path is not None: target_series = load_target_csv( self.target_csv_path, self.full_index, date_col=self.target_csv_date_col, value_col=self.target_csv_value_col, tz=self.tz, ) else: target_series = load_target( self.target_site, self.full_index, self.start_date, self.end_date, self.parameter_code, to_units=self.target_units, tz=self.tz, parameter_kind=self.target_parameter_kind, ) self.processed_X_data = process_data(raw_X_data, target_series) y = pd.to_numeric(target_series, errors="coerce").to_numpy(dtype=float) self.y = y self.X_seq, self.y_seq = generate_sequences( self.sequence_length, self.forecast_horizon, self.processed_X_data, self.y, ) # predictor channels only n_pred_channels = int(self.processed_X_data.shape[0] - 2) site_meta = site_meta[:n_pred_channels] dfm = pd.DataFrame(site_meta).copy() if "site_no" not in dfm.columns: dfm["site_no"] = "" if "site_no_norm" not in dfm.columns: dfm["site_no_norm"] = dfm["site_no"].astype(str).str.lstrip("0") dfm["site_no"] = dfm["site_no"].astype(str).fillna("") dfm["site_no_norm"] = dfm["site_no_norm"].astype(str).fillna("").str.lstrip("0") dfm["lat"] = pd.to_numeric(dfm.get("lat", np.nan), errors="coerce") dfm["lon"] = pd.to_numeric(dfm.get("lon", np.nan), errors="coerce") dfm["channel_index"] = np.arange(len(dfm), dtype=int) self.site_meta_df = dfm.reset_index(drop=True)
[docs] def split_train_test(self): """ Create boolean masks and split sequences into train and test arrays. Uses routines.generate_train_test_masks() to create per-sequence masks derived from the full daily index, sequence length, forecast horizon, and cutoff date. The masks are then applied to X_seq and y_seq. **Inputs** : None **Outputs** : None (populates train_mask, test_mask, X_train, y_train, X_test, y_test) """ self.train_mask, self.test_mask = generate_train_test_masks( self.full_index, self.sequence_length, self.y_seq, self.forecast_horizon, self.cutoff_date, ) self.X_train = self.X_seq[self.train_mask] self.y_train = self.y_seq[self.train_mask] self.X_test = self.X_seq[self.test_mask] self.y_test = self.y_seq[self.test_mask]
[docs] def normalize(self): """ Normalize predictors and targets using training set statistics. Predictors (X) are standardized per channel using mean and standard deviation computed over the training samples and time dimension. Targets (y) are standardized with sklearn.StandardScaler. """ X_train_flat = self.X_train.reshape(-1, self.X_train.shape[-1]) self.X_mean = np.nanmean(X_train_flat, axis=0) self.X_std = np.nanstd(X_train_flat, axis=0) self.X_std[self.X_std == 0] = 1.0 self.X_train_norm = (self.X_train - self.X_mean) / self.X_std self.X_test_norm = (self.X_test - self.X_mean) / self.X_std self.scaler_y = StandardScaler() self.y_train_scaled = self.scaler_y.fit_transform(self.y_train) self.y_test_scaled = self.scaler_y.transform(self.y_test)
[docs] def create_datasets(self): """ Wrap normalized arrays in SequenceDataset objects and attach site ids. Site ids are stored in both raw and normalized forms: - site_no_raw: original strings (may include leading zeros) - site_no_norm: leading zeros stripped """ self.train_dataset = SequenceDataset(self.X_train_norm, self.y_train_scaled) self.test_dataset = SequenceDataset(self.X_test_norm, self.y_test_scaled) ids_raw = self.site_meta_df["site_no"].astype(str).tolist() ids_norm = self.site_meta_df["site_no_norm"].astype(str).tolist() for ds in (self.train_dataset, self.test_dataset): ds.site_ids = ids_norm ds.site_ids_raw = ids_raw self.site_ids = ids_norm self.site_ids_raw = ids_raw
[docs] def setup(self): """ Run the full data preparation pipeline and create DataLoaders. **Inputs** : None **Outputs** : None (populates dataloader attributes) """ self.prepare_data() self.split_train_test() self.normalize() self.create_datasets() if TORCH_AVAILABLE and DataLoader is not None: use_cuda = torch.cuda.is_available() self.train_dataloader = DataLoader( self.train_dataset, batch_size=self.batch_size, shuffle=False, num_workers=0, pin_memory=use_cuda, ) self.test_dataloader = DataLoader( self.test_dataset, batch_size=self.batch_size, shuffle=False, num_workers=0, pin_memory=use_cuda, ) else: # fall back to None for environments without torch self.train_dataloader = None self.test_dataloader = None
# ============================================================================= # ============================================================================= # Model # ============================================================================= if TORCH_AVAILABLE and nn is not None: class CNN_LSTM(nn.Module): """ CNN-LSTM sequence encoder for regression on daily predictor sequences. """ def __init__(self, input_channels, seq_len): super().__init__() self.register_buffer("channel_mask", None) self.conv1 = nn.Conv1d(input_channels, 128, kernel_size=15, padding=7) self.conv2 = nn.Conv1d(128, 128, kernel_size=7, padding=3) self.conv3 = nn.Conv1d(128, 128, kernel_size=3, padding=1) self.pool = nn.AvgPool1d(kernel_size=3, stride=1, padding=1) self.act = nn.ReLU() self.dropout_cnn = nn.Dropout(p=0.05) self.lstm = nn.LSTM( input_size=128, hidden_size=128, num_layers=3, dropout=0.2, batch_first=True, bidirectional=False, ) self.dropout_fc = nn.Dropout(p=0.35) self.fc = nn.Linear(128, 1) for m in self.modules(): if isinstance(m, nn.Conv1d): nn.init.kaiming_uniform_(m.weight, nonlinearity="relu") if m.bias is not None: nn.init.zeros_(m.bias) elif isinstance(m, nn.Linear): nn.init.kaiming_uniform_(m.weight, nonlinearity="linear") nn.init.zeros_(m.bias)
[docs] @torch.no_grad() def set_channel_mask(self, mask_1d=None): if mask_1d is None: self.channel_mask = None else: self.channel_mask = mask_1d.view(-1)
[docs] def forward(self, x): x = x.permute(0, 2, 1) # [B, T, C] -> [B, C, T] if self.channel_mask is not None: x = x * self.channel_mask.view(1, -1, 1) x = self.act(self.conv1(x)) x = self.act(self.conv2(x)) x = self.act(self.conv3(x)) x = self.dropout_cnn(self.pool(x)) x = x.permute(0, 2, 1) # [B, T, 128] lstm_out, _ = self.lstm(x) last_timestep = lstm_out[:, -1, :] return self.fc(self.dropout_fc(last_timestep))
else:
[docs] class CNN_LSTM: def __init__(self, *args, **kwargs): raise RuntimeError("PyTorch is not available in the current environment; CNN_LSTM requires torch.")
# ============================================================================= # Metrics # =============================================================================
[docs] def nse_score(y_true, y_pred): """ Nash-Sutcliffe Efficiency (NSE) for regression performance. Returns NaN if the denominator is zero (constant observations). **Inputs** : y_true : 'array-like' Observations. y_pred : 'array-like' Predictions. **Outputs** : nse : 'float' Nash-Sutcliffe efficiency (NaN if undefined). """ y_true = np.asarray(y_true, dtype=float) y_pred = np.asarray(y_pred, dtype=float) denom = np.sum((y_true - np.mean(y_true)) ** 2) if denom == 0: return np.nan return 1.0 - np.sum((y_true - y_pred) ** 2) / denom
[docs] def willmott_score(y_true, y_pred): """ Willmott's index of agreement for regression performance. This implementation uses the squared-error form. Returns NaN if the denominator is zero. **Inputs** : y_true : 'array-like' Observations. y_pred : 'array-like' Predictions. **Outputs** : d : 'float' Willmott index of agreement (NaN if undefined). """ y_true = np.asarray(y_true, dtype=float) y_pred = np.asarray(y_pred, dtype=float) denom = np.sum((np.abs(y_pred - np.mean(y_true)) + np.abs(y_true - np.mean(y_true))) ** 2) if denom == 0: return np.nan return 1.0 - np.sum((y_true - y_pred) ** 2) / denom
# ============================================================================= # Trainer and evaluation # =============================================================================
[docs] class Trainer: """ Lightweight training and evaluation loop for a PyTorch regression model. Features: - Device selection (cpu/cuda) - Gradient clipping - Optional warmup learning-rate scaling - Metric evaluation on inverse-transformed targets The trainer assumes the target scaler is used to map model outputs and dataset targets back to native units for metric computation. """
[docs] def __init__( self, model, datamodule, scaler_y, criterion, optimizer, *, device=None, evaluations=None, max_grad_norm=1.0, ): """ **Inputs** : model : 'torch.nn.Module' Model to train. datamodule : 'GaugeDataModel' Prepared data model containing train/test dataloaders. scaler_y : 'sklearn.preprocessing.StandardScaler' Scaler fit on training targets; used to invert predictions and targets. criterion : 'callable' Loss function accepting (yhat, y) tensors and returning a scalar loss. optimizer : 'torch.optim.Optimizer' Optimizer instance. device : 'str or None' Device to run on ("cpu" or "cuda"). If None, auto-select. evaluations : 'dict or None' Mapping of metric name -> function(y_true, y_pred). Defaults to r2, nse, and willmott. max_grad_norm : 'float' Maximum gradient norm for clipping. **Outputs** : Trainer : 'Trainer' Configured trainer instance. """ if evaluations is None: evaluations = {"r2": r2_score, "nse": nse_score, "willmott": willmott_score} if not TORCH_AVAILABLE: raise RuntimeError("PyTorch is not available in the current environment; Trainer requires torch.") self.device = device if device else ("cuda" if torch.cuda.is_available() else "cpu") self.model = model.to(self.device) self.train_dataloader = datamodule.train_dataloader self.test_dataloader = datamodule.test_dataloader self.scaler_y = scaler_y self.criterion = criterion self.optimizer = optimizer self.evaluations = evaluations self.max_grad_norm = float(max_grad_norm) self.history = {}
[docs] def train_epoch(self): """ Train the model for one epoch over the training DataLoader. **Inputs** : None **Outputs** : mean_loss : 'float' Mean training loss over the epoch, weighted by batch size. """ self.model.train() running = 0.0 for Xb, yb in self.train_dataloader: Xb = Xb.to(self.device) yb = yb.to(self.device) self.optimizer.zero_grad(set_to_none=True) yhat = self.model(Xb) loss = self.criterion(yhat, yb) loss.backward() torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.max_grad_norm) self.optimizer.step() running += float(loss.item()) * int(Xb.size(0)) return running / float(len(self.train_dataloader.dataset))
[docs] def evaluate(self, dataloader=None): """ Run inference on a dataloader and return inverse-transformed y and yhat. **Inputs** : dataloader : 'torch.utils.data.DataLoader or None' DataLoader to evaluate. Defaults to the test DataLoader. **Outputs** : y_true : 'numpy.ndarray' Target values in original (inverse-transformed) units. y_pred : 'numpy.ndarray' Predicted values in original (inverse-transformed) units. """ if dataloader is None: dataloader = self.test_dataloader self.model.eval() preds, targets = [], [] with torch.no_grad(): for Xb, yb in dataloader: Xb = Xb.to(self.device) yb = yb.to(self.device) yhat = self.model(Xb) preds.append(yhat.detach().cpu().numpy()) targets.append(yb.detach().cpu().numpy()) preds = np.concatenate(preds, axis=0) targets = np.concatenate(targets, axis=0) y_pred = self.scaler_y.inverse_transform(preds) y_true = self.scaler_y.inverse_transform(targets) return y_true, y_pred
[docs] def fit( self, num_epochs, *, evaluate=True, warmup_epochs=15, warmup_scale_mode="linear", warmup_min_scale=0.1, ): """ Train at lower learning rate for a fixed number of epochs with optional metric evaluation. Framework: - If warmup_epochs > 0, learning rates are scaled for early epochs. - warmup_scale_mode="linear" scales from 1/warmup_steps up to 1. - Other modes use a constant warmup_min_scale. **Inputs** : num_epochs : 'int' Number of training epochs. evaluate : 'bool' If True, compute metrics each epoch using the test set. warmup_epochs : 'int' Number of epochs to apply learning-rate warmup. warmup_scale_mode : 'str' Warmup scaling mode ("linear" or constant fallback). warmup_min_scale : 'float' Constant scale factor used when warmup_scale_mode is not "linear". **Outputs** : history : 'dict' Training history with keys: "train_loss" and metric names (if enabled). **Raises** : ValueError If warmup_epochs is negative. """ if int(warmup_epochs) < 0: raise ValueError("warmup_epochs must be >= 0") num_epochs = int(num_epochs) warmup_epochs = int(warmup_epochs) history = {"train_loss": []} if evaluate: for name in self.evaluations.keys(): history[name] = [] base_lrs = [pg["lr"] for pg in self.optimizer.param_groups] warmup_steps = min(num_epochs, warmup_epochs) if warmup_epochs > 0 else 0 for epoch in range(num_epochs): if warmup_epochs > 0 and epoch < warmup_epochs: if warmup_scale_mode == "linear": scale = float(epoch + 1) / float(max(1, warmup_steps)) else: scale = float(warmup_min_scale) for pg, base_lr in zip(self.optimizer.param_groups, base_lrs): pg["lr"] = float(base_lr) * scale train_loss = self.train_epoch() history["train_loss"].append(float(train_loss)) if evaluate: y_true, y_pred = self.evaluate() y_true = y_true.ravel() y_pred = y_pred.ravel() for name, func in self.evaluations.items(): history[name].append(float(func(y_true, y_pred))) print(f"Epoch {epoch + 1}/{num_epochs}, Train Loss: {train_loss:.4f}") self.history = history return history
[docs] def evaluate_on_test(trainer, gdm): """ Evaluate a trained model on the GaugeDataModel test split. **Inputs** : trainer : 'Trainer' Trained Trainer instance. gdm : 'GaugeDataModel' Data model containing test mask and indexing metadata. **Outputs** : dates : 'pandas.DatetimeIndex or numpy.ndarray' Target dates corresponding to each test prediction. y_true : 'numpy.ndarray' True values in original units. y_pred : 'numpy.ndarray' Predicted values in original units. metrics : 'dict' Dictionary with keys: "r2", "nse", "willmott". """ y_true, y_pred = trainer.evaluate(dataloader=gdm.test_dataloader) y_true = y_true.ravel() y_pred = y_pred.ravel() all_dates = build_target_dates( gdm.full_index, gdm.sequence_length, gdm.forecast_horizon, int(len(gdm.y_seq)), ) dates = all_dates[gdm.test_mask] metrics = { "r2": float(r2_score(y_true, y_pred)), "nse": float(nse_score(y_true, y_pred)), "willmott": float(willmott_score(y_true, y_pred)), } return dates, y_true, y_pred, metrics
# ============================================================================= # SHAP utilities # =============================================================================
[docs] def shap_sites_importance( trainer, gdm, *, background_size=32, nsamples=512, use_test=True, random_state=42, ): """ Compute SHAP attributions and aggregate importance to gauge (site) level. Framework: - Moves the trained model to CPU and sets eval() for SHAP computation. - Selects a random subset of training samples for SHAP background. - Selects a random subset of samples for evaluation (test set by default). - Uses shap.GradientExplainer to compute per-sample attributions. - Aggregates absolute SHAP values across samples and timesteps to obtain a per-channel importance score. - Maps channel importance to site metadata (site_no, lat, lon), then aggregates across channels that belong to the same site. Expected SHAP shape after conversion: [S, T, C]. """ rng = np.random.RandomState(int(random_state)) model = trainer.model orig_device = next(model.parameters()).device was_training = model.training model.to("cpu") model.eval() device = torch.device("cpu") try: n_tr = len(gdm.train_dataset) bg_n = int(min(max(16, int(background_size)), n_tr)) bg_idx = rng.choice(n_tr, size=bg_n, replace=False) bg = gdm.train_dataset.X[bg_idx].to(device) dataset_eval = ( gdm.test_dataset if (use_test and hasattr(gdm, "test_dataset") and len(gdm.test_dataset) > 0) else gdm.train_dataset ) n_ev = len(dataset_eval) ev_n = int(min(max(64, int(nsamples)), n_ev)) ev_idx = rng.choice(n_ev, size=ev_n, replace=False) X_ev = dataset_eval.X[ev_idx].to(device) explainer = shap.GradientExplainer(model, bg) shap_vals = explainer.shap_values(X_ev) finally: model.to(orig_device) model.train() if was_training else model.eval() shap_arr = shap_vals[0] if isinstance(shap_vals, list) else shap_vals if torch.is_tensor(shap_arr): shap_arr = shap_arr.detach().cpu().numpy() if shap_arr.ndim != 3: raise ValueError(f"Unexpected SHAP shape {shap_arr.shape}; expected [S, T, C]") if shap_arr.shape[1] != X_ev.shape[1] or shap_arr.shape[2] != X_ev.shape[2]: raise ValueError("SHAP dimensions do not match input dimensions") phi_per_channel = np.mean(np.abs(shap_arr), axis=(0, 1)) n_site_channels = int(gdm.processed_X_data.shape[0] - 2) phi_pred = phi_per_channel[:n_site_channels] meta = gdm.site_meta_df.iloc[:n_site_channels].copy().reset_index(drop=True) if len(meta) != len(phi_pred): raise ValueError(f"Metadata rows ({len(meta)}) do not match channels ({len(phi_pred)})") meta["importance"] = phi_pred agg = ( meta.groupby("site_no", as_index=False) .agg( lat=("lat", "first"), lon=("lon", "first"), n_channels=("channel_index", "count"), importance=("importance", "sum"), ) .sort_values("importance", ascending=False) .reset_index(drop=True) ) vmax = float(agg["importance"].max()) if len(agg) else 0.0 agg["importance_norm"] = 0.0 if vmax <= 0.0 else agg["importance"] / vmax agg["rank"] = np.arange(1, len(agg) + 1, dtype=int) return agg[["rank", "site_no", "lat", "lon", "importance", "importance_norm", "n_channels"]]
[docs] def export_shap_sites(shap_df, out_dir, *, horizon, min_norm_for_list=0.0): """ Export per-site SHAP importance products to CSV and plain-text site list. Files written: - shap_sites.csv - site_list.txt """ out_dir = Path(out_dir) out_dir.mkdir(parents=True, exist_ok=True) df = shap_df.copy() df["horizon"] = int(horizon) cols = ["site_no", "lat", "lon", "importance", "importance_norm", "n_channels", "horizon"] df[cols].to_csv(out_dir / "shap_sites.csv", index=False) df_list = df if float(min_norm_for_list) <= 0.0 else df[df["importance_norm"] >= float(min_norm_for_list)] with open(out_dir / "site_list.txt", "w", encoding="utf-8") as f: for s in df_list.sort_values(["importance_norm", "importance"], ascending=False)["site_no"].astype(str): f.write(f"{s}\n") return out_dir / "shap_sites.csv", out_dir / "site_list.txt"
# ============================================================================= # Horizon directories and SHAP-based site selection # =============================================================================
[docs] def horizon_dir(run_root, h): """ Construct a standardized subdirectory path for a forecast horizon. Example: horizon_dir("runs", 3) -> Path("runs") / "H03" """ return Path(run_root) / f"H{int(h):02d}"
[docs] def load_previous_shap_df(full_shap_root, h): """ Load a previously exported shap_sites.csv for a given horizon if it exists. If the CSV exists and contains "site_no" but not "site_no_norm", a normalized id column is added by stripping leading zeros. """ shap_csv = horizon_dir(full_shap_root, h) / "shap_sites.csv" if not shap_csv.exists(): return None df = pd.read_csv(shap_csv) if "site_no" in df.columns and "site_no_norm" not in df.columns: df["site_no_norm"] = df["site_no"].astype(str).str.lstrip("0") return df
[docs] def get_allowed_sites_for_horizon( h, *, shap_root=None, site_selection_mode="all", n_shap_by_h=None, default_n_shap=20, ): """ Return the list of allowed predictor site IDs for a given forecast horizon. If ``site_selection_mode`` is "from_shap", this reads the per-horizon ``shap_sites.csv`` file (under ``shap_root/Hxx/``) and returns the top sites by SHAP importance. Otherwise, returns ``None`` to indicate that all sites are allowed. Parameters ---------- h : int or str Forecast horizon (days ahead). shap_root : pathlib.Path or str, optional Root directory containing per-horizon SHAP outputs. site_selection_mode : str, optional Either "from_shap" to limit to top sites or anything else to allow all. n_shap_by_h : dict, optional Mapping horizon -> number of sites to keep. Falls back to ``default_n_shap``. default_n_shap : int, optional Default number of sites to keep when ``n_shap_by_h`` is not provided. Returns ------- list[str] or None List of site_no_norm values (leading zeros stripped) or ``None``. """ if str(site_selection_mode).lower() != "from_shap": return None if shap_root is None: raise ValueError("shap_root must be provided when site_selection_mode='from_shap'") shap_csv = horizon_dir(shap_root, h) / "shap_sites.csv" if not shap_csv.exists(): raise FileNotFoundError( f"site_selection_mode='from_shap' but shap_sites.csv not found for H={h}: {shap_csv}" ) df = pd.read_csv(shap_csv) if "site_no_norm" not in df.columns and "site_no" in df.columns: df["site_no_norm"] = df["site_no"].astype(str).str.lstrip("0") imp_col = "importance_norm" if "importance_norm" in df.columns else "importance" n_sites = int((n_shap_by_h or {}).get(int(h), int(default_n_shap))) if imp_col not in df.columns: return df.get("site_no_norm", pd.Series(dtype=str)).astype(str).str.lstrip("0").tolist() return ( df.sort_values(imp_col, ascending=False) .head(n_sites)["site_no_norm"] .astype(str) .str.lstrip("0") .tolist() )
[docs] def prepare_shap_sites_used(shap_df_used, *, allowed_sites=None, n_sites=None): """ Normalize and optionally filter a SHAP site table for reporting and saving. Framework: - Adds site_no_norm if missing - Filters to allowed_sites if provided (by site_no_norm) - Adds importance_norm if missing (max-normalized importance) - Optionally truncates to top n_sites - Returns a compact table of commonly used columns """ if shap_df_used is None or len(shap_df_used) == 0: return None df = shap_df_used.copy() if "site_no_norm" not in df.columns and "site_no" in df.columns: df["site_no_norm"] = df["site_no"].astype(str).str.lstrip("0") if allowed_sites is not None and "site_no_norm" in df.columns: df = df[df["site_no_norm"].isin(set(allowed_sites))].copy() if "importance_norm" not in df.columns and "importance" in df.columns: vals = df["importance"].to_numpy(dtype=float) vmax = float(np.nanmax(vals)) if len(vals) else 0.0 df["importance_norm"] = 0.0 if vmax <= 0.0 else (vals / vmax) imp_col = "importance_norm" if "importance_norm" in df.columns else ( "importance" if "importance" in df.columns else None ) if n_sites is not None and imp_col is not None and df.shape[0] > int(n_sites): df = df.sort_values(imp_col, ascending=False).head(int(n_sites)).copy() cols_out = [c for c in ["site_no", "site_no_norm", "importance", "importance_norm", "lat", "lon"] if c in df.columns] return df[cols_out].copy() if cols_out else None
# ============================================================================= # Save run artifacts and summaries # =============================================================================
[docs] def save_run_artifacts( out_dir, *, dates_test, y_true, y_pred, metrics, history, model_state_dict, scaler_y, ): """ Save standard run outputs to a directory. Files written: - predictions.csv : date, y_true, y_pred - metrics.json : scalar evaluation metrics - history.json : training history time series - model.pt : torch state dict - scaler_y.pkl : pickled sklearn scaler for inverse transforms """ out_dir = Path(out_dir) out_dir.mkdir(parents=True, exist_ok=True) dates_str = pd.to_datetime(dates_test).astype(str) pd.DataFrame( {"date": dates_str, "y_true": np.asarray(y_true).ravel(), "y_pred": np.asarray(y_pred).ravel()} ).to_csv(out_dir / "predictions.csv", index=False) with open(out_dir / "metrics.json", "w", encoding="utf-8") as f: json.dump({k: float(v) for k, v in metrics.items()}, f, indent=2) with open(out_dir / "history.json", "w", encoding="utf-8") as f: out_hist = {} for k, v in history.items(): if isinstance(v, (list, tuple, np.ndarray)): out_hist[k] = [float(x) for x in v] else: out_hist[k] = float(v) json.dump(out_hist, f, indent=2) torch.save(model_state_dict, out_dir / "model.pt") with open(out_dir / "scaler_y.pkl", "wb") as f: pickle.dump(scaler_y, f)
[docs] def update_compute_summary( compute_summary, *, forecast_horizon=None, fh=None, # backward compatible n_params=None, train_time=None, eval_time=None, metrics=None, hp=None, ): """ Write or update per-horizon results in a compute summary dictionary. Stores a record under: compute_summary["runs"][forecast_horizon] """ if forecast_horizon is None: forecast_horizon = fh if forecast_horizon is None: raise TypeError("update_compute_summary requires forecast_horizon (or fh)") metrics_out = {k: float(v) for k, v in (metrics or {}).items()} hp_out = {} for k, v in (hp or {}).items(): if isinstance(v, (int, float)): hp_out[k] = v elif isinstance(v, np.integer): hp_out[k] = int(v) elif isinstance(v, np.floating): hp_out[k] = float(v) elif isinstance(v, np.datetime64): hp_out[k] = str(v) else: hp_out[k] = str(v) compute_summary["runs"][int(forecast_horizon)] = { "forecast_horizon": int(forecast_horizon), "n_parameters": int(n_params) if n_params is not None else None, "train_time_sec": float(train_time) if train_time is not None else None, "eval_time_sec": float(eval_time) if eval_time is not None else None, "metrics": metrics_out, "hyperparameters": hp_out, } return compute_summary
[docs] def save_compute_summary(results_root, compute_summary): """ Write a compute summary dictionary to compute_summary.json. """ results_root = Path(results_root) results_root.mkdir(parents=True, exist_ok=True) with open(results_root / "compute_summary.json", "w", encoding="utf-8") as f: json.dump(compute_summary, f, indent=2)
# ============================================================================= # Run per horizon model # =============================================================================
[docs] def run_horizon( forecast_horizon, *, data_files, use_csv_target, target_site, target_parameter_code, start_date, end_date, tz, hp, device, allowed_sites=None, csv_path=None, csv_date_col=None, csv_value_col=None, shap_mode="none", site_selection_mode="all", full_shap_root=None, n_sites=None, out_dir=None, shap_random_state=42, target_units="metric", target_parameter_kind=None, ): """ Train and evaluate a model for a single forecast horizon, optionally with SHAP. Framework: - Builds a GaugeDataModel and prepares DataLoaders - Instantiates the CNN_LSTM model and optimizer - Trains for hp["epochs"] epochs - Evaluates on the test split and returns metrics and predictions - Optionally computes SHAP site importance ("run") or loads previous SHAP results - Optionally writes run artifacts to out_dir """ target_units_nwis = target_units if not use_csv_target: tu = str(target_units).lower() if tu in {"m", "meter", "meters", "metre", "metres"}: target_units_nwis = "metric" elif tu in {"ft", "feet", "foot"}: target_units_nwis = "customary" gdm = GaugeDataModel( data_files=data_files, target_site=None if use_csv_target else target_site, start_date=start_date, end_date=end_date, tz=tz, sequence_length=hp["sequence_length"], forecast_horizon=int(forecast_horizon), cutoff_date=hp["cutoff_date"], parameter_code=None if use_csv_target else target_parameter_code, batch_size=hp["batch_size"], allowed_site_ids_norm=allowed_sites, target_csv_path=str(csv_path) if (use_csv_target and csv_path is not None) else None, target_csv_date_col=csv_date_col if use_csv_target else None, target_csv_value_col=csv_value_col if use_csv_target else None, target_units=target_units_nwis, target_parameter_kind=target_parameter_kind, ) gdm.setup() model = CNN_LSTM(gdm.train_dataset.input_channels, hp["sequence_length"]) model.dropout_cnn.p = float(hp.get("dropout_cnn", model.dropout_cnn.p)) model.lstm.dropout = float(hp.get("dropout_lstm", model.lstm.dropout)) model.dropout_fc.p = float(hp.get("dropout_fc", model.dropout_fc.p)) n_params = count_parameters(model) optimizer = torch.optim.Adam( model.parameters(), lr=float(hp["learning_rate"]), weight_decay=float(hp["weight_decay"]), ) criterion = hp["loss_function"] trainer = Trainer( model=model, datamodule=gdm, scaler_y=gdm.scaler_y, criterion=criterion, optimizer=optimizer, device=device, max_grad_norm=float(hp["max_grad_norm"]), ) t0 = time.perf_counter() trainer.fit(int(hp["epochs"])) train_time = time.perf_counter() - t0 t1 = time.perf_counter() dates_test, y_true, y_pred, metrics = evaluate_on_test(trainer, gdm) eval_time = time.perf_counter() - t1 if out_dir is not None: save_run_artifacts( out_dir, dates_test=dates_test, y_true=y_true, y_pred=y_pred, metrics=metrics, history=trainer.history, model_state_dict=trainer.model.state_dict(), scaler_y=gdm.scaler_y, ) shap_df_used = None if shap_mode == "run": shap_df_run = shap_sites_importance( trainer, gdm, background_size=int(hp["background_size"]), nsamples=int(hp["nsamples"]), use_test=True, random_state=shap_random_state, ) if out_dir is not None: export_shap_sites(shap_df_run, Path(out_dir), horizon=int(forecast_horizon)) shap_df_used = shap_df_run.copy() else: if full_shap_root is not None: shap_df_prev = load_previous_shap_df(full_shap_root, int(forecast_horizon)) if shap_df_prev is not None: shap_df_used = shap_df_prev.copy() shap_used_out = prepare_shap_sites_used( shap_df_used, allowed_sites=(allowed_sites if site_selection_mode == "from_shap" else None), n_sites=n_sites, ) if shap_used_out is not None and out_dir is not None: shap_used_out.to_csv(Path(out_dir) / "shap_sites_used.csv", index=False) return { "dates_test": dates_test, "y_true": y_true, "y_pred": y_pred, "metrics": metrics, "history": trainer.history, "n_params": n_params, "train_time": train_time, "eval_time": eval_time, "shap_sites_used": shap_used_out, }