Source code for pownet.reservoir.manager

"""manager.py: ReservoirManager class for managing reservoir operations and simulations."""

import os
import pandas as pd

from .reservoir import Reservoir
from ..data_model import ReservoirParams
from .reservoir_functions import (
    find_upstream_units,
    find_downstream_flow_fractions,
    find_simulation_order,
)


[docs] def find_upstream_flow( reservoir: Reservoir, reservoirs: dict[str, Reservoir] ) -> pd.Series: unit_name = reservoir.name total_upstream_flow = pd.Series( 0, index=range(1, reservoir.sim_days + 1), name="upstream_flow" ) for upstream_unit in reservoir.upstream_units: upstream_reservoir = reservoirs[upstream_unit] # Get the release and spill from the upstream reservoir total_upstream_flow += ( upstream_reservoir.release + upstream_reservoir.spill ) * upstream_reservoir.downstream_flow_fracs[unit_name] return total_upstream_flow
[docs] class ReservoirManager: def __init__(self): self.reservoirs: dict[str, Reservoir] = {} self.simulation_order: list[str] = []
[docs] def load_reservoirs_from_csv(self, input_folder: str) -> None: """Load Basin information from a CSV file.""" ############################################################################### # Read CSV files ############################################################################### # Reservoir units reservoir_data = pd.read_csv( os.path.join(input_folder, "reservoir_unit.csv"), header=0 ) # Flow paths filepath = os.path.join(input_folder, "flow_path.csv") if os.path.exists(filepath): flow_paths = pd.read_csv(filepath, header=0) flow_paths = flow_paths # Inflow and minimum flow time series inflow_ts = pd.read_csv(os.path.join(input_folder, "inflow.csv"), header=0) inflow_ts.index += 1 minflow_ts = pd.read_csv( os.path.join(input_folder, "minimum_flow.csv"), header=0 ) minflow_ts.index += 1 # Time series indexing starts at 1 ################################################################################ # Instantiate Reservoir objects ################################################################################ for _, row in reservoir_data.iterrows(): unit_name = row["name"] upstream_units = find_upstream_units(flow_paths, unit_name) downstream_flow_fracs = find_downstream_flow_fractions( flow_paths, unit_name ) params = ReservoirParams( name=unit_name, min_day=int(row["min_day"]), max_day=int(row["max_day"]), min_level=float(row["min_level"]), max_level=float(row["max_level"]), max_head=float(row["max_head"]), max_storage=float(row["max_storage"]), max_release=float(row["max_release"]), max_generation=float(row["max_generation"]), turbine_factor=float(row["turbine_factor"]), inflow_ts=inflow_ts[row["name"]], minflow_ts=minflow_ts[row["name"]], upstream_units=upstream_units, downstream_flow_fracs=downstream_flow_fracs, ) # Create a new Reservoir object and add it to the list reservoir = Reservoir(params) self.reservoirs[unit_name] = reservoir ############################################################################# # Process the network topology ############################################################################## self.simulation_order = find_simulation_order( reservoir_names=self.reservoirs.keys(), flow_paths=flow_paths )
[docs] def simulate(self) -> None: """Simulate the reservoir operations to get hydropower time series.""" for unit_name in self.simulation_order: reservoir = self.reservoirs[unit_name] total_upstream_flow = find_upstream_flow(reservoir, self.reservoirs) reservoir.set_upstream_flow(total_upstream_flow) reservoir.simulate()
[docs] def get_hydropower_ts( self, unit_node_mapping: dict[str, str] = None ) -> pd.DataFrame: """Get the hydropower time series for all reservoirs.""" df = pd.DataFrame() for unit_name in self.simulation_order: reservoir = self.reservoirs[unit_name] temp_df = pd.DataFrame( { unit_name: reservoir.daily_hydropower.values, } ) df = pd.concat([df, temp_df], axis=1) df.index = range(1, len(df) + 1) # Create multi-level column index if unit_node_mapping is provided if unit_node_mapping: df.columns = pd.MultiIndex.from_tuples( [(unit, unit_node_mapping[unit]) for unit in df.columns] ) return df
[docs] def write_hydropower_to_csv( self, output_filepath: str, unit_node_mapping: dict[str, str] = None ) -> None: """Write the hydropower time series to CSV files.""" hydropower_df = self.get_hydropower_ts(unit_node_mapping) hydropower_df.to_csv(output_filepath, index=False)
[docs] def reoperate( self, daily_dispatch: dict[(str, int), float], days_in_step: range ) -> dict[str, float]: """Reoperate the reservoirs based on the daily dispatch of the power system model. Note that we don't reoperate on the first day of the simulation period. """ proposed_capacity = {k: 0 for k in daily_dispatch.keys()} for unit_name in self.simulation_order: for day in days_in_step: reservoir = self.reservoirs[unit_name] # Find the upstream flow total_upstream_flow = find_upstream_flow(reservoir, self.reservoirs) # Simulate reservoir.set_upstream_flow(total_upstream_flow) proposed_capacity[unit_name, day] = reservoir.reoperate( day=day, daily_dispatch=daily_dispatch[unit_name, day], upstream_flow_t=total_upstream_flow.loc[day], ) return proposed_capacity