# -*- coding: utf-8 -*-
"""
GaugePredict/downloader.py
Utilities for retrieving USGS NWIS daily-values (DV) time series and assembling
a screened gauge catalog by HUC.
"""
from __future__ import division, print_function, absolute_import
import json
from pathlib import Path
import numpy as np
import pandas as pd
from dataretrieval import nwis
# =============================================================================
# Column selection
# =============================================================================
def _rank_parameter_col(name, preference):
"""
Rank a candidate NWIS daily-values column name based on keyword preference.
Many NWIS DV requests return multiple columns for a parameter code
(e.g., value/mean/sum/total variants). This helper assigns a rank based on
the first matching keyword in `preference`, with lower ranks preferred.
"""
name_l = str(name).lower()
for i, tag in enumerate(preference):
if tag in name_l:
return i
return len(preference)
def _pick_parameter_col(df, parameter_code, *, parameter_kind=None):
"""
Select the preferred NWIS DV column corresponding to a parameter code.
This function filters columns that include `parameter_code` in their name,
then selects the best match using a simple preference order.
For precipitation-like parameters, "sum/total/accum" is prioritized since
daily totals are commonly desired. For other parameters, "mean/value" is
prioritized.
"""
matches = [c for c in df.columns if str(parameter_code) in str(c)]
if not matches:
return None
if str(parameter_kind).lower() in {"precip", "precipitation"}:
preference = ["sum", "total", "accum", "mean", "value", str(parameter_code)]
else:
preference = ["mean", "value", "sum", "total", str(parameter_code)]
return sorted(matches, key=lambda c: _rank_parameter_col(c, preference))[0]
# =============================================================================
# Time + units
# =============================================================================
def _ensure_datetime_index(df):
"""
Ensure a DataFrame is indexed by pandas.DatetimeIndex.
NWIS returns dataframes that are sometimes already indexed by datetime, and
sometimes include a "datetime" column. This helper normalizes both cases.
"""
if isinstance(df.index, pd.DatetimeIndex):
return df
if "datetime" in df.columns:
df = df.copy()
df.index = pd.to_datetime(df["datetime"])
return df
df = df.copy()
df.index = pd.to_datetime(df.index)
return df
def _to_utc_index(idx, *, tz="UTC"):
"""
Convert an index to a tz-aware UTC pandas.DatetimeIndex.
If `idx` is timezone-naive, it is localized to `tz` first, then converted
to UTC. If `idx` already has timezone information, it is directly converted
to UTC.
"""
idx = pd.to_datetime(idx)
if getattr(idx, "tz", None) is None:
idx = idx.tz_localize(tz)
return idx.tz_convert("UTC")
def _as_utc_daily_index(full_index, *, tz="UTC"):
"""
Convert a provided daily index to a tz-aware UTC pandas.DatetimeIndex.
If the index is timezone-naive, it is localized to `tz` and then converted
to UTC.
"""
idx = pd.to_datetime(full_index)
if getattr(idx, "tz", None) is None:
idx = idx.tz_localize(tz)
return idx.tz_convert("UTC")
def _convert_units_from_parameter_code(ts, parameter_code, *, to_units="metric"):
"""
Convert common USGS DV units to metric using parameter code conventions.
Conversions:
- 00060 discharge: cfs -> m^3/s
- 00065 gage height / stage: ft -> m
- 00045 precipitation: inches -> mm
If `to_units` is "native" or None, the series is returned unchanged.
"""
code = str(parameter_code)
if to_units is None or str(to_units).lower() in {"native", "none"}:
return ts
to_units_l = str(to_units).lower()
if to_units_l != "metric":
raise ValueError(f"Unsupported to_units='{to_units}'. Use 'metric' or 'native'.")
ts = ts.astype(float)
if code == "00060":
return ts * 0.0283168466
if code == "00065":
return ts * 0.3048
if code == "00045":
return ts * 25.4
raise ValueError(f"No conversion defined for parameter_code={parameter_code} with to_units='{to_units}'.")
# =============================================================================
# Public API
# =============================================================================
[docs]
def load_target(
target_site,
full_index,
start_date,
end_date,
parameter_code,
*,
to_units="metric",
tz="UTC",
parameter_kind=None,
):
"""
Retrieve NWIS daily-values for a site and align to a provided daily index.
Downloads DV from USGS NWIS for a single site and parameter, converts the
series to a timezone-aware UTC index, and reindexes to `full_index` (daily).
Missing days are filled by interpolation and edge filling (ffill/bfill).
Optionally converts units to metric for a small set of parameter codes.
"""
target_site = str(target_site)
parameter_code = str(parameter_code)
dv = nwis.get_dv(
sites=target_site,
parameterCd=parameter_code,
start=start_date,
end=end_date,
)
df = dv[0].copy() if isinstance(dv, (list, tuple)) else dv.copy()
df = _ensure_datetime_index(df)
df.index = _to_utc_index(df.index, tz=tz)
col = _pick_parameter_col(df, parameter_code, parameter_kind=parameter_kind)
if col is None:
raise RuntimeError(
f"No daily-values column found for parameter_code={parameter_code} at site={target_site}"
)
utc_daily = _as_utc_daily_index(full_index, tz=tz)
ts = (
pd.to_numeric(df[col], errors="coerce")
.sort_index()
.reindex(utc_daily)
.interpolate(limit_direction="both")
.ffill()
.bfill()
)
ts = _convert_units_from_parameter_code(ts, parameter_code, to_units=to_units)
ts.name = parameter_code
return ts
[docs]
def GaugebyHUC(
start_date,
end_date,
huc_codes,
parameter_code,
percent_threshold,
data_dir,
json_path,
siteType=None,
*,
tz="UTC",
):
"""
Build a HUC-grouped gauge catalog from NWIS, screened by data completeness.
For each requested HUC code, this function:
1) Queries NWIS site metadata for the given parameter code.
2) Downloads daily values (DV) for each site for the requested date range.
3) Computes data completeness as (non-NaN days / expected days) * 100.
4) Keeps only sites above `percent_threshold`.
5) Writes a JSON cache keyed by HUC then site number.
"""
data_dir = Path(data_dir)
data_dir.mkdir(parents=True, exist_ok=True)
json_path = Path(json_path)
json_path.parent.mkdir(parents=True, exist_ok=True)
huc_codes = [str(h) for h in huc_codes]
parameter_code = str(parameter_code)
full_index = pd.date_range(start=start_date, end=end_date, freq="D")
expected_days = int(len(full_index))
utc_daily = pd.DatetimeIndex(full_index).tz_localize(tz).tz_convert("UTC")
site_info_list = []
for huc in huc_codes:
kwargs = {"huc": huc, "parameterCd": parameter_code}
if siteType is not None:
kwargs["siteType"] = siteType
sites = nwis.what_sites(**kwargs)
if isinstance(sites, tuple):
sites = sites[0]
if sites is None or len(sites) == 0:
continue
sites = sites.copy()
sites["HUC"] = huc
site_info_list.append(sites)
if not site_info_list:
raise RuntimeError("No sites returned for requested HUC codes and parameter code.")
site_info_df = (
pd.concat(site_info_list, ignore_index=True)
.loc[:, ["site_no", "dec_lat_va", "dec_long_va", "HUC"]]
.rename(
columns={
"dec_lat_va": "latitude",
"dec_long_va": "longitude",
"HUC": "huc",
}
)
)
data_coverage = []
parameter_data = {}
for site_no in site_info_df["site_no"].astype(str).tolist():
try:
dv = nwis.get_dv(
sites=site_no,
parameterCd=parameter_code,
start=start_date,
end=end_date,
)
df = dv[0].copy() if isinstance(dv, (list, tuple)) else dv.copy()
except Exception:
continue
df = _ensure_datetime_index(df)
df.index = _to_utc_index(df.index, tz=tz)
col = _pick_parameter_col(df, parameter_code)
if col is None:
continue
s = pd.to_numeric(df[col], errors="coerce").sort_index().reindex(utc_daily)
valid_days = int(s.notna().sum())
data_coverage.append({"site_no": site_no, "valid_days": valid_days})
parameter_data[site_no] = s
if not parameter_data:
raise RuntimeError("No daily values retrieved for any site.")
coverage_df = pd.DataFrame(data_coverage)
coverage_df["completeness_%"] = 100.0 * coverage_df["valid_days"] / float(expected_days)
coverage_df = (
coverage_df.merge(site_info_df, on="site_no", how="left")
.sort_values("completeness_%", ascending=False)
.reset_index(drop=True)
)
keep = coverage_df["completeness_%"] > float(percent_threshold)
kept_df = coverage_df.loc[keep].copy()
kept_ids = set(kept_df["site_no"].astype(str).tolist())
site_dict = {h: {} for h in huc_codes}
for site_no, s in parameter_data.items():
site_no = str(site_no)
if site_no not in kept_ids:
continue
row = kept_df.loc[kept_df["site_no"].astype(str) == site_no]
if row.empty:
continue
row = row.iloc[0]
huc = str(row["huc"])
s2 = s.dropna()
pairs = ((ts.date().isoformat(), float(v)) for ts, v in s2.items())
site_dict[huc][site_no] = {
"latitude": float(row["latitude"]),
"longitude": float(row["longitude"]),
"completeness_%": float(row["completeness_%"]),
"parameter": dict(pairs),
"cluster": None,
"huc": huc,
}
with json_path.open("w", encoding="utf-8") as f:
json.dump(site_dict, f, sort_keys=True, indent=2)
return {
"num_hucs": int(len(huc_codes)),
"num_sites_total": int(len(site_info_df)),
"num_sites_kept": int(sum(len(v) for v in site_dict.values())),
"json_path": str(json_path.resolve()),
}