# -*- coding: utf-8 -*-
"""
Core utility functions and data processing routines used across GaugePredict.
Features:
- Data loading and preprocessing
- File I/O operations
- Configuration management
- Common computational tasks
Used by downloader.py, predict.py, and plotting.py.
"""
from __future__ import division, print_function, absolute_import
import json
import warnings
from pathlib import Path
import numpy as np
import pandas as pd
import geopandas as gpd
# =============================================================================
# Project paths and run configuration
# =============================================================================
[docs]
def get_project_root(file_path, levels_up=1):
"""
Resolve a project root directory by walking up parent folders.
**Inputs** :
file_path : 'str or pathlib.Path'
Path within the project (often __file__ from a module).
levels_up : 'int'
Number of parent levels to traverse. levels_up=1 returns the parent
directory of file_path.
**Outputs** :
project_root : 'pathlib.Path'
Resolved project root directory.
"""
p = Path(file_path).resolve()
return p.parents[int(levels_up)]
[docs]
def resolve_under_project(project_root, rel_path):
"""
Resolve a relative path under a known project root.
**Inputs** :
project_root : 'str or pathlib.Path'
Project root directory.
rel_path : 'str or pathlib.Path'
Relative path under the project root.
**Outputs** :
path : 'pathlib.Path'
Absolute, resolved path under the project root.
"""
return (Path(project_root) / Path(rel_path)).resolve()
[docs]
def load_run_config(run_root):
"""
Load a run configuration JSON from a run directory.
Expects a file named "compute_summary.json" inside run_root.
**Inputs** :
run_root : 'str or pathlib.Path'
Directory containing compute_summary.json.
**Outputs** :
config : 'dict'
Parsed JSON configuration dictionary.
**Raises** :
FileNotFoundError
If compute_summary.json does not exist under run_root.
"""
run_root = Path(run_root)
summary_path = run_root / "compute_summary.json"
if not summary_path.exists():
raise FileNotFoundError(f"compute_summary.json not found: {summary_path}")
with open(summary_path, "r", encoding="utf-8") as f:
return json.load(f)
# =============================================================================
# Geospatial Boundaries
# =============================================================================
[docs]
def load_hucs_3857(base_dir):
"""
Load HUC2 watershed boundary shapefiles and project to EPSG:3857.
Expected layout under base_dir:
HUC??/WBDHU2.shp
**Inputs** :
base_dir : 'str or pathlib.Path'
Root directory containing HUC?? folders.
**Outputs** :
gdf : 'geopandas.GeoDataFrame'
Concatenated watershed boundaries in EPSG:3857.
**Raises** :
FileNotFoundError
If no matching shapefiles are found, or none could be loaded successfully.
"""
base_dir = Path(base_dir)
hits = list(base_dir.glob("HUC??/WBDHU2.shp"))
if not hits:
raise FileNotFoundError(f"No HUC shapefiles found under {base_dir}")
codes = sorted({p.parent.name[-2:] for p in hits})
gdfs = []
for code in codes:
shp = base_dir / f"HUC{code}" / "WBDHU2.shp"
if not shp.exists():
warnings.warn(f"Missing {shp}, skipping")
continue
try:
df = gpd.read_file(shp).rename(columns=str.lower)
except Exception as e:
warnings.warn(f"Failed to read {shp}: {e}")
continue
if "huc2" not in df.columns:
if "huc02" in df.columns:
df = df.rename(columns={"huc02": "huc2"})
else:
df["huc2"] = code
df["huc2"] = df["huc2"].astype(str).str.zfill(2)
gdfs.append(df)
if not gdfs:
raise FileNotFoundError("No valid HUC shapefiles after scanning")
out = gpd.GeoDataFrame(pd.concat(gdfs, ignore_index=True), crs=gdfs[0].crs)
if out.crs is None:
warnings.warn("HUC GeoDataFrame has no CRS; assuming EPSG:4326 before projecting to 3857")
out = out.set_crs(4326)
if out.crs.to_epsg() == 3857:
return out
return out.to_crs(3857)
# =============================================================================
# Time index utilities
# =============================================================================
[docs]
def generate_full_index(start_date, end_date, *, localize=True, tz="UTC"):
"""
Generate a daily date range.
If localize=True, returns a timezone-aware DatetimeIndex in timezone `tz`.
If localize=False, returns a naive DatetimeIndex.
**Inputs** :
start_date, end_date : 'str or datetime-like'
Inclusive bounds accepted by pandas.date_range().
localize : 'bool'
If True, localize/convert the index to timezone `tz`.
tz : 'str'
Timezone name used for localization or conversion.
**Outputs** :
idx : 'pandas.DatetimeIndex'
Daily index from start_date through end_date.
"""
idx = pd.date_range(start=start_date, end=end_date, freq="D")
if localize:
if idx.tz is None:
idx = idx.tz_localize(tz)
else:
idx = idx.tz_convert(tz)
return idx
[docs]
def build_target_dates(full_index, sequence_length, forecast_horizon, n_samples):
"""
Build the target date vector aligned to generated supervised sequences.
For a sequence length T and forecast horizon H, the first target corresponds
to full_index[T + H - 1].
**Inputs** :
full_index : 'pandas.DatetimeIndex'
Full daily index covering the modeling period.
sequence_length : 'int'
Number of past days used per input sample (T).
forecast_horizon : 'int'
Lead time in days between last input day and target day (H).
n_samples : 'int'
Number of supervised samples (typically len(y_seq)).
**Outputs** :
dates : 'numpy.ndarray'
Array of datetime-like values corresponding to each target.
"""
start = int(sequence_length) + int(forecast_horizon) - 1
return np.asarray(full_index[start : start + int(n_samples)])
[docs]
def generate_train_test_masks(full_index, sequence_length, y_seq, forecast_horizon, cutoff_date, tz="UTC"):
"""
Generate boolean masks for train/test split based on a cutoff target date.
Split is on target dates:
- train if target_date < cutoff
- test otherwise
**Inputs** :
full_index : 'pandas.DatetimeIndex'
Full daily index.
sequence_length : 'int'
Input sequence length (T).
y_seq : 'array-like'
Target sequence array used only for its length.
forecast_horizon : 'int'
Forecast horizon (H).
cutoff_date : 'str or datetime-like'
Cutoff date for splitting.
tz : 'str'
Timezone assumption if full_index and cutoff_date are naive.
**Outputs** :
train_mask : 'numpy.ndarray (bool)'
Boolean array of length len(y_seq), True for training samples.
test_mask : 'numpy.ndarray (bool)'
Boolean array of length len(y_seq), True for test samples.
"""
T = int(sequence_length)
H = int(forecast_horizon)
start = T + H - 1
full_utc = _as_utc_daily_index(full_index, tz=tz)
target_dates = np.asarray(full_utc[start : start + int(len(y_seq))])
cutoff = pd.Timestamp(cutoff_date)
if cutoff.tz is None:
cutoff = cutoff.tz_localize(tz)
cutoff = cutoff.tz_convert("UTC")
train_mask = target_dates < cutoff
test_mask = target_dates >= cutoff
return train_mask, test_mask
# =============================================================================
# Timezone helpers (kept for backward compatibility)
# =============================================================================
def _to_utc_index(idx, *, tz="UTC"):
"""
Convert an index-like object to a tz-aware UTC DatetimeIndex.
If timestamps are naive, localize to `tz` first, then convert to UTC.
"""
idx = pd.to_datetime(idx)
if getattr(idx, "tz", None) is None:
idx = idx.tz_localize(tz)
return idx.tz_convert("UTC")
def _full_index_utc(full_index, *, tz="UTC"):
"""
Convert a full_index to a tz-aware UTC DatetimeIndex.
"""
return _to_utc_index(pd.to_datetime(full_index), tz=tz)
def _as_utc_daily_index(full_index, tz="UTC"):
"""
Convert a daily full_index to a tz-aware UTC DatetimeIndex.
"""
idx = pd.DatetimeIndex(full_index)
if idx.tz is None:
idx = idx.tz_localize(tz)
return idx.tz_convert("UTC")
def _normalize_site_no(site_no):
"""
Return both raw and normalized site identifiers.
"""
site_no_raw = str(site_no).strip()
site_no_norm = site_no_raw.lstrip("0")
return site_no_raw, site_no_norm
def _normalize_site_id(site_no):
"""
Backward compatible alias: normalize a site identifier to a clean string.
"""
return str(site_no).strip()
def _normalize_site_id_norm(site_no):
"""
Backward compatible alias: normalize a site identifier with leading zeros removed.
"""
return str(site_no).strip().lstrip("0")
def _series_from_parameter_dict(param_dict, *, tz="UTC"):
"""
Convert a date-keyed dictionary to a daily UTC pandas.Series.
Input format:
{"YYYY-MM-DD": value, ...}
"""
if not isinstance(param_dict, dict) or len(param_dict) == 0:
return pd.Series(dtype=float)
dt = pd.to_datetime(list(param_dict.keys()), errors="coerce")
vals = list(param_dict.values())
s = pd.Series(vals, index=dt)
s = s[~s.index.isna()]
if s.empty:
return pd.Series(dtype=float)
s.index = _to_utc_index(s.index, tz=tz)
s = pd.to_numeric(s, errors="coerce")
return s.sort_index()
# =============================================================================
# Predictor Loading
# =============================================================================
[docs]
def load_data(
data_files,
full_index,
*,
allow_site_ids_norm=None,
tz="UTC",
fill=True,
):
"""
Load predictor time series from one or more cached JSON files.
Expected JSON layout (Layout B only):
{site_no: payload, ...}
Each payload must contain a date-keyed mapping under `data_key`, typically "parameter":
payload[data_key] = {"YYYY-MM-DD": value, ...}
"""
if not isinstance(data_files, (list, tuple)) or len(data_files) == 0:
raise ValueError("data_files must be a non-empty list of dicts with keys {'path', optional 'data_key'}")
allowed = None
if allow_site_ids_norm is not None:
allowed = {str(s).lstrip("0") for s in allow_site_ids_norm}
full_utc = _as_utc_daily_index(full_index, tz=tz)
all_series = []
all_meta = []
def _coerce_float(x):
try:
return float(pd.to_numeric(x, errors="coerce"))
except Exception:
return np.nan
def _try_add_site(site_no, payload, *, data_key, source_path):
if not isinstance(payload, dict):
return
site_no_raw, site_no_norm = _normalize_site_no(site_no)
if allowed is not None and site_no_norm not in allowed:
return
data = payload.get(data_key, None)
if data is None or not isinstance(data, dict) or len(data) == 0:
return
lat = payload.get("latitude", payload.get("lat", np.nan))
lon = payload.get("longitude", payload.get("lon", np.nan))
s = pd.Series(data, dtype="float64")
s.index = pd.to_datetime(s.index, errors="coerce")
s = s[~s.index.isna()]
if s.empty:
return
s = pd.to_numeric(s, errors="coerce")
s = s.replace([-99999, -999999, -9999, -999], np.nan)
s = s.mask(s <= -1e4, np.nan)
if s.index.tz is None:
s.index = s.index.tz_localize(tz)
s.index = s.index.tz_convert("UTC")
s = s.sort_index().reindex(full_utc)
if fill:
s = s.interpolate(limit_direction="both").ffill().bfill()
arr = s.to_numpy(dtype=float)
if not np.isfinite(arr).any():
return
all_series.append(arr.astype(np.float32))
all_meta.append(
{
"site_no": site_no_raw,
"site_no_norm": site_no_norm,
"lat": _coerce_float(lat),
"lon": _coerce_float(lon),
"huc": payload.get("huc", None),
"data_key": str(data_key),
"source_path": str(source_path),
}
)
for spec in data_files:
if not isinstance(spec, dict) or "path" not in spec:
raise ValueError("Each entry in data_files must be a dict containing at least {'path': ...}")
json_path = Path(spec["path"])
data_key = str(spec.get("data_key", "parameter"))
if not json_path.exists():
raise FileNotFoundError(f"Predictor JSON not found: {json_path}")
with json_path.open("r", encoding="utf-8") as f:
obj = json.load(f)
if not isinstance(obj, dict) or len(obj) == 0:
continue
for site_no, payload in obj.items():
_try_add_site(site_no, payload, data_key=data_key, source_path=json_path)
if not all_series:
raise RuntimeError(
"No predictor series loaded.\n"
"Common causes:\n"
" - data_key is wrong (your JSON does not contain that key)\n"
" - allowed_sites filtered everything\n"
" - JSON is not layout B: expected {site_no: payload}\n"
)
X = np.asarray(all_series, dtype=np.float32)
if X.ndim != 2 or X.shape[1] != len(full_utc):
raise RuntimeError(f"Predictor array has wrong shape {X.shape}; expected [C, {len(full_utc)}]")
return X, all_meta
# =============================================================================
# Target CSV loading
# =============================================================================
[docs]
def load_target_csv(csv_path, full_index, *, date_col="date", value_col="value", tz="UTC", fill=True):
"""
Load a daily target time series from a CSV file and align to full_index.
"""
csv_path = Path(csv_path)
if not csv_path.exists():
raise FileNotFoundError(f"Target CSV not found: {csv_path}")
df = pd.read_csv(csv_path)
if date_col not in df.columns:
raise ValueError(f"Target CSV missing date_col='{date_col}'")
if value_col not in df.columns:
raise ValueError(f"Target CSV missing value_col='{value_col}'")
d = pd.to_datetime(df[date_col], errors="coerce")
valid = ~d.isna()
if not bool(valid.any()):
raise ValueError("Target CSV has no parseable dates")
df = df.loc[valid].copy()
d = pd.to_datetime(df[date_col], errors="coerce")
if getattr(d.dt, "tz", None) is None:
d = d.dt.tz_localize(tz)
d = d.dt.tz_convert("UTC")
v = pd.to_numeric(df[value_col], errors="coerce").to_numpy()
s = pd.Series(v, index=d).sort_index()
full_utc = _as_utc_daily_index(full_index, tz=tz)
s = s.reindex(full_utc)
if fill:
s = s.interpolate(limit_direction="both").ffill().bfill()
s.name = str(value_col)
return s
# =============================================================================
# Predictor stack processing + sequence construction
# =============================================================================
[docs]
def process_data(raw_X_data, target_series, *, smooth_window_days=1):
"""
Construct a final predictor stack by appending target-derived channels.
Appends:
- smoothed target (rolling mean)
- first difference of smoothed target
"""
y = pd.to_numeric(pd.Series(target_series), errors="coerce").to_numpy(dtype=float)
if y.ndim != 1:
y = y.reshape(-1)
w = int(max(1, smooth_window_days))
y_sm = pd.Series(y).rolling(window=w, center=True, min_periods=1).mean().to_numpy(dtype=float)
y_diff = np.diff(y_sm, prepend=y_sm[0]).astype(float)
X = np.asarray(raw_X_data, dtype=np.float32)
y_sm = np.asarray(y_sm, dtype=np.float32).reshape(1, -1)
y_diff = np.asarray(y_diff, dtype=np.float32).reshape(1, -1)
if X.shape[1] != y_sm.shape[1]:
raise ValueError("Predictor length does not match target length")
return np.vstack([X, y_sm, y_diff]).astype(np.float32)
[docs]
def generate_sequences(sequence_length, forecast_horizon, x_raw, y):
"""
Convert continuous predictor/target arrays into supervised learning sequences.
**Outputs** :
X_seq: [N, T, C]
y_seq: [N, 1]
"""
T = int(sequence_length)
H = int(forecast_horizon)
y = np.asarray(y, dtype=np.float32).reshape(-1)
x_raw = np.asarray(x_raw, dtype=np.float32)
n = len(y) - T - H + 1
if n <= 0:
raise ValueError("Not enough data to build sequences for the requested sequence_length and forecast_horizon")
x_seq = []
y_seq = []
for i in range(n):
x_seq.append(x_raw[:, i : i + T].T)
y_seq.append(y[i + T + H - 1])
return (
np.asarray(x_seq, dtype=np.float32),
np.asarray(y_seq, dtype=np.float32).reshape(-1, 1),
)