Blending Rainfall¶
Welcome to this tutorial on using hip-analysis for blending satellite rainfall with MET station data. We will walk you through each step of the analysis process, including preparing the data, unbiasing satellite data, computing variograms for kriging interpolation and blend the data.
First let's get prepared by importing necessary packages and creating generic functions that will help us later.
import numpy as np
import pandas as pd
import gstools as gs
from tqdm import tqdm
import matplotlib.pyplot as plt
import panel as pn
import holoviews as hv
hv.extension("bokeh", width=90)
from hip.analysis import AnalysisArea
from hip.analysis.ops._kriging import variogram
from hip.analysis.analyses.blend import get_variogram_per_wet_month, blend_rfr, is_in_season
from hip.analysis.visualization.plot_kriging import plot_variogram
from hip.analysis.compute.utils import start_dask, persist_with_progress_bar
# Scatter plot of CHIRP & station values
def scatter_chirp_station(CHIRP: pd.DataFrame, stations: pd.DataFrame):
max_val = max(np.nanmax(CHIRP.max().values), np.nanmax(stations.max().values))
return hv.Scatter(
pd.concat([
CHIRP.melt(value_name="CHIRP", var_name="station_name"),
stations.melt(value_name="Station"),
], axis=1).loc[:, ["CHIRP", "Station", "station_name"]]
).opts(
color="station_name", show_legend=False, cmap="tab20c", width=400, height=400, xlim=(0, max_val), ylim=(0, max_val), tools=["hover"]
)
# Ratios to unbias satellite data at each time step
def compute_ratios(stations, CHIRP):
condition = stations.sub(stations.mean(axis=1), axis=0).div(stations.std(axis=1), axis=0) < 3
stations_mean = stations.where(condition, np.nan).mean(axis=1)
CHIRP_mean = CHIRP.where(condition, np.nan).mean(axis=1)
ratios = (stations_mean + 1) / (CHIRP_mean + 1)
ratios = ratios.where(stations.std(axis=1) > 0, 1)
# Unbias CHIRP
CHIRP_U = CHIRP.mul(ratios, axis=0)
ratios = pd.concat([stations_mean, CHIRP_mean, ratios, stations.notnull().sum(axis=1)], axis=1)
ratios.columns = ['mean stations', 'mean chirp', 'ratio', 'number stations']
return CHIRP_U, ratios
# Compute residuals between CHIRP and station data
def compute_residuals(stations, CHIRP):
return CHIRP - stations
def plot_residuals(res_df):
return hv.Scatter(
res_df.melt(value_name="Residuals", var_name="station_name", ignore_index=False).reset_index().loc[:, ["Date", "Residuals", "station_name"]]
).opts(
color="station_name",
cmap="tab20c",
size=1,
show_legend=False,
width=700,
height=400,
tools=["hover"],
)
Data loading and preparation¶
In this section, we'll load the data and make sure it is in the right format for the blending analysis.
MET station data¶
We need two DataFrames from these station data for our blending procedure:
latlon: DataFrame containing the spatial coordinates of each station (one station needs to be fixed spatially over time)
- index: station names
- columns: at least 'Longitude' and 'Latitude'
stations: DataFrame containing the rainfall values at each station location for each date
- index: dates
- columns: station names
data_path = f"/s3/scratch/public-share/long_term_blending/MOZ/"
latlon = pd.read_csv(data_path + "latlon.csv", index_col = 0)
stations = pd.read_csv(data_path + "precip_met_moz_blending.csv", index_col = 0, parse_dates = ['Date']).iloc[slice(-2000, None)]
latlon.head(5)
| Longitude | Latitude | |
|---|---|---|
| station | ||
| AEROPORTO FJN | 33.7491 | -24.89531 |
| ANGOCHE | 39.9000 | -16.22000 |
| BEIRA-AEROPORTO | 34.9000 | -19.79810 |
| BEIRA-OBSERVATORIO | 34.8500 | -19.83000 |
| CAIA | 35.3330 | -17.83300 |
stations.tail(4)
| AEROPORTO FJN | ANGOCHE | BEIRA-AEROPORTO | BEIRA-OBSERVATORIO | CAIA | CATANDICA | CHANGALANE | CHANGARA | CHIBUTO | CHICUALACUALA | ... | SONGO | SUSSUNDENGA | TETE - AEROPORTO (CHINGONDZI) | TETE - CIDADE | TSANGANO | ULONGUE | VILANCULOS | XAI-XAI | XAI-XAI-CHANGWENE | ZUMBO | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| Date | |||||||||||||||||||||
| 2024-03-28 | 0.0 | 0.0 | 11.6 | NaN | 0.3 | 4.3 | 0.0 | 0.0 | 0.0 | NaN | ... | 0.0 | NaN | 0.0 | 0.0 | 0.4 | 0.0 | 0.0 | NaN | 0.0 | 0.0 |
| 2024-03-29 | 0.0 | 0.0 | 0.0 | NaN | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | NaN | ... | 0.0 | NaN | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | NaN | 0.0 | 0.0 |
| 2024-03-30 | 7.1 | 0.0 | 8.2 | NaN | 0.0 | 0.0 | 3.4 | 0.0 | 0.0 | NaN | ... | 0.0 | NaN | 0.0 | 0.0 | 29.0 | 0.8 | 0.0 | NaN | 0.0 | 0.0 |
| 2024-03-31 | 5.7 | 8.7 | 9.4 | NaN | 3.3 | 3.2 | 1.2 | 0.0 | 0.0 | NaN | ... | 0.0 | NaN | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | NaN | 0.3 | 0.0 |
4 rows × 67 columns
CHIRP data¶
CHIRP is a Satellite-only Rainfall Estimate from the Climate Hazards Centre at the Univ. of California, Santa Barbara. It provides gridded rainfall values with a 5km spatial resolution and at daily or dekadal time steps. CHIRP data is accesible in the Humanitarian Data Cube through hip-analysis. We will need the gridded dataset for the blending analysis, but also to extract its rainfall values at the MET stations locations into a DataFrame CHIRP with the same format as stations.
Start a Dask client
This is only for computation purposes.
client = start_dask()
INFO:distributed.http.proxy:To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy INFO:distributed.scheduler:State start INFO:distributed.diskutils:Found stale lock file and directory '/tmp/dask-scratch-space/scheduler-5o0rcxvb', purging INFO:distributed.diskutils:Found stale lock file and directory '/tmp/dask-scratch-space/worker-69j7365l', purging INFO:distributed.scheduler: Scheduler at: tcp://127.0.0.1:43579 INFO:distributed.scheduler: dashboard at: /user-redirect/proxy/8787/status INFO:distributed.scheduler:Registering Worker plugin shuffle INFO:distributed.nanny: Start Nanny at: 'tcp://127.0.0.1:45759' INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:33111', name: 0, status: init, memory: 0, processing: 0> INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:33111 INFO:distributed.core:Starting established connection to tcp://127.0.0.1:48814 INFO:distributed.scheduler:Receive client connection: Client-b674a711-0897-11ef-9009-0a6244b413e5 INFO:distributed.core:Starting established connection to tcp://127.0.0.1:48820 INFO:distributed.core:Connection to tcp://127.0.0.1:48814 has been closed. INFO:distributed.scheduler:Remove worker <WorkerState 'tcp://127.0.0.1:33111', name: 0, status: running, memory: 134, processing: 0> (stimulus_id='handle-worker-cleanup-1714664168.6176996') INFO:distributed.scheduler:Lost all workers INFO:distributed.nanny:Closing Nanny gracefully at 'tcp://127.0.0.1:45759'. Reason: worker-handle-scheduler-connection-broken WARNING:distributed.scheduler:Received heartbeat from unregistered worker 'tcp://127.0.0.1:33111'. INFO:distributed.nanny:Worker process 20590 was killed by signal 15 INFO:distributed.nanny:Closing Nanny at 'tcp://127.0.0.1:45759'. Reason: nanny-close-gracefully
Use hip.analysis package to load CHIRP data over Mozambique
daterange = f'{stations.index[0].strftime("%Y-%m-%d")}/{stations.index[-1].strftime("%Y-%m-%d")}'
area = AnalysisArea.from_admin_boundaries(
iso3='MOZ', admin_level=0, datetime_range=daterange
)
da_chirp = area.get_dataset(["CHIRP", "RFR_DAILY"])
da_chirp = da_chirp.chunk(dict(time=30, latitude=-1, longitude=-1))
da_chirp = persist_with_progress_bar(da_chirp)
[########################################] | 100% Completed | 1min 55.5s
da_chirp.isel(time=-1).plot.imshow(cmap="Blues")
<matplotlib.image.AxesImage at 0x7f57abee48b0>
Extract values at station locations
CHIRP = pd.DataFrame(index=stations.index, columns=stations.columns)
for s, row in tqdm(latlon.iterrows()):
values_at_station = da_chirp.sel(latitude=row.Latitude, longitude=row.Longitude, method="nearest")
CHIRP.loc[values_at_station.time.values, s] = values_at_station.values
CHIRP = CHIRP.astype(float)
CHIRP.tail(4)
67it [00:09, 7.27it/s]
| AEROPORTO FJN | ANGOCHE | BEIRA-AEROPORTO | BEIRA-OBSERVATORIO | CAIA | CATANDICA | CHANGALANE | CHANGARA | CHIBUTO | CHICUALACUALA | ... | SONGO | SUSSUNDENGA | TETE - AEROPORTO (CHINGONDZI) | TETE - CIDADE | TSANGANO | ULONGUE | VILANCULOS | XAI-XAI | XAI-XAI-CHANGWENE | ZUMBO | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| Date | |||||||||||||||||||||
| 2024-03-28 | 0.0 | 6.2 | 1.2 | 1.1 | 0.6 | 1.3 | 0.0 | 0.4 | 0.0 | 0.1 | ... | 0.4 | 0.4 | 1.0 | 1.2 | 2.7 | 1.5 | 0.0 | 0.0 | 0.0 | 0.3 |
| 2024-03-29 | 1.4 | 0.8 | 0.6 | 0.5 | 0.1 | 0.2 | 2.7 | 0.2 | 1.1 | 0.0 | ... | 0.0 | 0.2 | 0.0 | 0.1 | 0.9 | 0.9 | 0.3 | 1.3 | 1.3 | 2.1 |
| 2024-03-30 | 3.0 | 1.4 | 1.9 | 1.7 | 1.7 | 1.9 | 2.2 | 0.0 | 2.6 | 0.7 | ... | 0.0 | 1.0 | 0.0 | 0.0 | 0.1 | 0.1 | 0.4 | 2.8 | 2.8 | 0.1 |
| 2024-03-31 | 0.2 | 1.0 | 1.2 | 1.0 | 2.5 | 2.1 | 0.0 | 0.3 | 0.2 | 0.1 | ... | 0.4 | 0.3 | 0.3 | 0.3 | 0.6 | 0.3 | 0.3 | 0.2 | 0.2 | 0.0 |
4 rows × 67 columns
Unbias CHIRP data and compute residuals¶
To combine CHIRP and station data, we will interpolate spatially (with kriging) the residuals, ie. the difference between CHIRP and station rainfall values. To be interpolated through kriging, we want to assume that those residuals are a mean zero Gaussian process. Thus, we need to first perform an unbiasing of the CHIRP data such that CHIRP and station data have the same mean for a given time step in order for the residuals to have a zero mean.
Unbias CHIRP with a ratio mean correction
scatter_chirp_station(CHIRP, stations)
CHIRP, ratios = compute_ratios(stations, CHIRP)
Compute residuals
residuals = compute_residuals(stations, CHIRP)
plot_residuals(residuals)
Variogram estimation¶
To perform the kriging interpolation, we need to understand rain varibility in our data in function of the distance. This is done by computing an empirical semi-variogram (or variogram) which will allow us to estimate the Gaussian process characterizing our residuals.
Rainfall variability is completely different over time, with different beaviours in the middle of the wet season or during the dry one. Thus, we will estimate several Gaussian models. Here we will compute one for each month of the wet season and one for the dry season, but this depends on the data and area of study.
We will assume that rainfall variability is here following Gaussian process with exponential or matern covariance kernels (other kernels can be tried).
Variography parameters
bin_size = 20
max_dist = 1500
start_wet_season = 10
end_wet_season = 5
Example of exponential variogram fitting for February
vario_exp_feb = variogram(
latlon,
residuals,
fit_model=gs.Exponential(),
bin_size=bin_size,
weight=True,
max_dist=max_dist,
month_filter=2,
)
plot_variogram(vario_exp_feb, bin_size)
(<Figure size 640x480 with 1 Axes>,
<Axes: title={'center': 'Global Variogram\n Exponential (r2=0.449): 1.0, 282.39, 0.1617'}>)
Compute one variogram per wet month and one global dry
# Wet months
wet_months = [m for m in range(1, 13) if is_in_season(m, start_wet_season, end_wet_season)]
# Use hip analysis function to get variograms
vario_outputs_exp = get_variogram_per_wet_month(
latlon, residuals, fit_model=gs.Exponential(), bin_size=bin_size, weight=True, max_dist=max_dist, wet_season=wet_months
)
vario_outputs_mat = get_variogram_per_wet_month(
latlon, residuals, fit_model=gs.Matern(), bin_size=bin_size, weight=True, max_dist=max_dist, wet_season=wet_months, kwargs_fit_model={"nu":0.3}
)
# Plot variograms
for key in vario_outputs_exp.keys():
print(8*"###", " Month ", key, " ", 8*"###")
plot_variogram([vario_outputs_exp[key], vario_outputs_mat[key]], bin_size)
######################## Month 1 ########################
######################## Month 2 ########################
######################## Month 3 ########################
######################## Month 4 ########################
######################## Month 5 ########################
######################## Month 10 ########################
######################## Month 11 ########################
######################## Month 12 ########################
######################## Month dry ########################
# Select the best kernel between Exponential and Matern for each month/season
vario_outputs = {}
for s in vario_outputs_exp:
best_kernel = np.argmax([vario_outputs_exp[s].r2, vario_outputs_mat[s].r2])
vario_outputs[s] = [vario_outputs_exp, vario_outputs_mat][best_kernel][s]
Blending with kriging interpolation¶
# Perform kriging using hip-analysis blending function and covariance kernels
da_blended = blend_rfr(
da_chirp,
residuals,
vario_outputs,
latlon,
ratios=ratios,
sigma=2,
min_values=0,
start_wet=start_wet_season,
end_wet=end_wet_season,
)
[########################################] | 100% Completed | 7min 2.7s
Visualize and compare outputs¶
da_blended = da_blended.scale * da_blended + da_blended.offset
moz_shp = area.get_dataset([area.BASE_AREA_DATASET])
da_blended = da_blended.rio.clip(moz_shp.geometry.values, moz_shp.crs)
da_chirp = da_chirp.rio.clip(moz_shp.geometry.values, moz_shp.crs)
date = '2022-03-31'
vmax = max(da_blended.sel(time=date).max().values, da_chirp.sel(time=date).max().values)
da_blended.sel(time=date).hip.viz.map(title="Blended", cmap='Blues', vmax=vmax)
da_chirp.sel(time=date).hip.viz.map(title="CHIRP", cmap='Blues', vmax=vmax)
(da_chirp.sel(time=date) - da_blended.sel(time=date)).hip.viz.map(title="Difference", cmap='seismic_r')
2024-05-02 15:36:08,805 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
File "/envs/user/hip-analysis-dev-env/lib/python3.10/site-packages/distributed/comm/tcp.py", line 225, in read
frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)
tornado.iostream.StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/envs/user/hip-analysis-dev-env/lib/python3.10/site-packages/distributed/worker.py", line 1252, in heartbeat
response = await retry_operation(
File "/envs/user/hip-analysis-dev-env/lib/python3.10/site-packages/distributed/utils_comm.py", line 455, in retry_operation
return await retry(
File "/envs/user/hip-analysis-dev-env/lib/python3.10/site-packages/distributed/utils_comm.py", line 434, in retry
return await coro()
File "/envs/user/hip-analysis-dev-env/lib/python3.10/site-packages/distributed/core.py", line 1395, in send_recv_from_rpc
return await send_recv(comm=comm, op=key, **kwargs)
File "/envs/user/hip-analysis-dev-env/lib/python3.10/site-packages/distributed/core.py", line 1154, in send_recv
response = await comm.read(deserializers=deserializers)
File "/envs/user/hip-analysis-dev-env/lib/python3.10/site-packages/distributed/comm/tcp.py", line 236, in read
convert_stream_closed_error(self, e)
File "/envs/user/hip-analysis-dev-env/lib/python3.10/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.heartbeat_worker local=tcp://127.0.0.1:44288 remote=tcp://127.0.0.1:43579>: Stream is closed