Source code for pownet.data_utils

"""data_utils.py: functions for processing user inputs"""

import re
import datetime
import os

import numpy as np
import geopandas as gpd
import pandas as pd
from shapely.geometry import LineString, Point

from .folder_utils import get_database_dir


[docs] def get_dates(year, num_days=365): """Return a dataframe of dates for the given year. The dataframe will have 365 rows, one for each day of the year. The columns are 'date' and 'hour'. Exclude 29th February. """ # Create dates to concatenate with the new dataframes dates = pd.DataFrame( {"date": pd.date_range(start=str(year), periods=num_days + 1, freq="D")} ) # Remove 29th Feb because we do not deal with them dates = dates.loc[dates.date.dt.strftime("%m-%d") != "02-29"] # Remove 1st Jan of the next year in case it is included when it is not a leap year dates = dates.loc[dates.date.dt.strftime("%Y-%m-%d") != f"{year + 1}-01-01"] # In case we need three columns: date, hour, and day dates = dates.loc[dates.index.repeat(24)] dates["hour"] = np.tile(range(1, 25), num_days) dates = dates.reset_index(drop=True) return dates
[docs] def get_datetime_index(year: int) -> pd.DatetimeIndex: """Return a datetime index for the given year. The index will have 8760 entries, one for each hour of the year. Exclude 29th February.""" dates = pd.date_range(start=f"{year}-01-01", end=f"{year + 1}-01-01", freq="h") # Remove 29th February dates = dates[~((dates.month == 2) & (dates.day == 29))] return dates[dates.year == year]
[docs] def remove_29feb(timeseries: pd.Series) -> pd.Series: """Clean and reindex flow data by removing February 29th and resetting the index. Args: timeseries (pd.Series): Timeseries to be cleaned and reindexed. Returns: pd.Series: The cleaned and reindexed flow data. """ # Remove 29th February from the dataset cleaned_data = timeseries[~((timeseries.month == 2) & (timeseries.day == 29))] # Index starts from 1 cleaned_data = cleaned_data.reset_index(drop=True) cleaned_data.index += 1 return cleaned_data
[docs] def create_init_condition( thermal_units: list, storage_units: list = None, ess_max_capacity: dict[str, float] = None, ) -> dict[(str, int), dict]: """Return dicts of system statuses in the format {(unit, hour): value}""" # Assume thermal units in the systems are offline at the beginning initial_p = {unit_g: 0 for unit_g in thermal_units} initial_u = initial_p.copy() initial_v = initial_p.copy() initial_w = initial_p.copy() # Thermal units do not carry any minimum up and down time at the beginning initial_min_on = {unit_g: 0 for unit_g in thermal_units} initial_min_off = initial_min_on.copy() # Energy storage systems start with zero charge if storage_units is None: initial_charge_state = {} elif len(storage_units) > 0: initial_charge_state = {unit: 0 for unit in storage_units} if ess_max_capacity is not None: for unit in ess_max_capacity: initial_charge_state[unit] = ess_max_capacity[unit] else: initial_charge_state = {} return { "initial_p": initial_p, "initial_u": initial_u, "initial_v": initial_v, "initial_w": initial_w, "initial_min_on": initial_min_on, "initial_min_off": initial_min_off, "initial_charge_state": initial_charge_state, }
[docs] def get_node_hour_from_flow_constraint(constraint_name: str) -> tuple[str, int]: """Get the node and hour from the flow constraint name. Args: constraint_name: The name of the constraint. Returns: The node and hour. """ flow_constraint_pattern = re.compile(r"flowBal\[(.+),(\d+)\]") match = flow_constraint_pattern.match(constraint_name) if match: node = match.group(1) hour = int(match.group(2)) return node, hour else: return None, None
[docs] def get_unit_hour_from_varname(var_name: str) -> tuple[str, int]: """Get the unit and hour from the variable name. Args: var_name: The name of the variable. Returns: The unit and hour. """ node_var_pattern = re.compile(r"(\w+)\[(.+),(\d+)\]") match = node_var_pattern.match(var_name) if match: unit = match.group(2) hour = int(match.group(3)) return unit, hour else: raise ValueError("Invalid variable name format")
[docs] def get_edge_hour_from_varname(var_name: str) -> tuple[tuple[str, str], int]: """Get the edge and hour from the variable name: flow_fwd[a,b,t] or flow_bwd[a,b,t]. Args: var_name: The name of the variable. Returns: The edge (tuple of two strings) and hour (int). """ edge_var_pattern = re.compile(r"flow_(?:fwd|bwd)\[([^,]+),([^,]+),(\d+)\]") match = edge_var_pattern.match(var_name) if not match: raise ValueError(f"Invalid variable name format: {var_name}") node1 = match.group(1) node2 = match.group(2) hour = int(match.group(3)) edge = (node1, node2) return edge, hour
[docs] def get_current_time() -> str: return datetime.datetime.now().strftime("%Y%m%d_%H%M")
[docs] def write_df( df: pd.DataFrame, output_folder: str, output_name: str, model_id: str, ) -> None: """Write a dataframe to the output folder. Args: df: The dataframe to write. output_name: The name of the output file. model_id: The model ID. Returns: None """ # First check that the output directory exists. If not, create it. if not os.path.exists(output_folder): os.makedirs(output_folder) df.to_csv( os.path.join(output_folder, f"{model_id}_{output_name}.csv"), index=False, )
[docs] def calc_remaining_duration( solution: pd.DataFrame, sim_horizon: int, thermal_units: list[str], duration_dict: dict[str, int], # Generic for TU or TD vartype: str, # 'startup' or 'shutdown' ) -> dict[str, int]: """Calculates the remaining duration (on or off) for each thermal unit. This function analyzes the provided solution DataFrame to determine the latest timestep at which a specified event (startup or shutdown) occurred for each thermal unit. It then calculates the remaining duration based on the simulation horizon and the unit's minimum required duration. Args: solution: A DataFrame containing the solution of the optimization model. sim_horizon: The length of the simulation horizon. thermal_units: A list of thermal unit names. duration_dict: A dictionary mapping unit names to their respective minimum durations. vartype: The type of event to analyze. Either 'startup' or 'shutdown'. Returns: A dictionary mapping unit names to their remaining durations. Raises: ValueError: If the simulation horizon is shorter than the maximum duration of any thermal unit. """ # This logic does not work if sim_horizon is shorter than the duration if sim_horizon < max(duration_dict.values()): raise ValueError( "The simulation horizon is shorter than the maximum duration of the thermal units." ) remaining_durations = {} for unit in thermal_units: subset = ( solution[(solution["node"] == unit) & (solution["vartype"] == vartype)] .set_index("timestep") .drop(["vartype", "node"], axis=1) ) filtered_df = subset[subset["value"] == 1] if len(filtered_df) > 0: latest_event_timestep = filtered_df.index.max() else: latest_event_timestep = -sim_horizon remaining_durations[unit] = max( 0, duration_dict[unit] - (sim_horizon - latest_event_timestep) - 1 ) return remaining_durations
[docs] def calc_remaining_on_duration( solution: pd.DataFrame, sim_horizon: int, thermal_units: list[str], TU: dict[str, int], ) -> dict[str, int]: """Calculate the remaining online duration for each thermal unit. Args: solution: A DataFrame containing the solution of the optimization model. sim_horizon: The length of the simulation horizon. thermal_units: A list of thermal unit names. TU: A dictionary mapping unit names to their respective minimum online durations. Returns: A dictionary mapping unit names to their remaining online durations. """ return calc_remaining_duration(solution, sim_horizon, thermal_units, TU, "startup")
[docs] def calc_remaining_off_duration( solution: pd.DataFrame, sim_horizon: int, thermal_units: list[str], TD: dict[str, int], ) -> dict[str, int]: """Calculate the remaining shutdown duration for each thermal unit. Args: solution: A DataFrame containing the solution of the optimization model. sim_horizon: The length of the simulation horizon. thermal_units: A list of thermal unit names. TD: A dictionary mapping unit names to their respective minimum offline durations. Returns: A dictionary mapping unit names to their remaining shutdown durations. """ return calc_remaining_duration(solution, sim_horizon, thermal_units, TD, "shutdown")
[docs] def parse_node_variables( solution: pd.DataFrame, sim_horizon: int, step_k: int ) -> pd.DataFrame: """Parse the node variables from the solution DataFrame. Node variables are in the (node, t) format. Also, ensure binary values are rounded to 0 or 1. Args: solution: The solution DataFrame. sim_horizon: The length of the simulation horizon. step_k: The current simulation period. Returns: pd.DataFrame: The node variables DataFrame""" node_var_pattern = r"(\w+)\[(.+),(\d+)\]" current_node_vars = solution[solution["varname"].str.match(node_var_pattern)].copy() # Flow should not be included in the node variables current_node_vars = current_node_vars[ ~current_node_vars["vartype"].isin(["flow_fwd", "flow_bwd"]) ] current_node_vars[["node", "timestep"]] = current_node_vars["varname"].str.extract( node_var_pattern, expand=True )[[1, 2]] current_node_vars["timestep"] = current_node_vars["timestep"].astype(int) current_node_vars["hour"] = current_node_vars["timestep"] + sim_horizon * ( step_k - 1 ) # Rounding binary values current_node_vars.loc[ np.isclose(current_node_vars["value"], 0, atol=1e-4), "value" ] = 0 current_node_vars.loc[ np.isclose(current_node_vars["value"], 1, atol=1e-4), "value" ] = 1 return current_node_vars
[docs] def parse_flow_variables( solution: pd.DataFrame, sim_horizon: int, step_k: int ) -> pd.DataFrame: """ Parses flow variables from the solution DataFrame. The flow variables are expected in the format: flow_fwd[node_a,node_b,t] or flow_bwd[node_a,node_b,t]. Args: solution: The solution DataFrame with a 'varname' column. sim_horizon: The length of the simulation horizon for a single step_k (e.g., 24 hours). step_k: The current simulation period (1-indexed). Returns: pd.DataFrame: A DataFrame with parsed flow variables, including columns for 'node_a', 'node_b', 'type' (fwd/bwd), 'value', 'timestep' (relative to step_k), and 'hour' (absolute). """ # Matches flow_fwd[node_a,node_b,t] or flow_bwd[node_a,node_b,t] # It captures the type (fwd or bwd), node_a, node_b, and t. flow_var_pattern = r"flow_(fwd|bwd)\[([^,]+),([^,]+),(\d+)\]" # Filter rows that match the flow variable pattern flow_vars_mask = solution["varname"].str.contains( r"flow_(?:fwd|bwd)\[.+,.+,\d+\]", regex=True ) cur_flow_vars = solution[flow_vars_mask].copy() if cur_flow_vars.empty: return pd.DataFrame( columns=["node_a", "node_b", "type", "value", "timestep", "hour"] ) # Extract components from varname extracted_data = cur_flow_vars["varname"].str.extract(flow_var_pattern, expand=True) cur_flow_vars[["type", "node_a", "node_b", "timestep"]] = extracted_data # Convert timestep to integer cur_flow_vars["timestep"] = cur_flow_vars["timestep"].astype(int) # Calculate absolute hour # Assuming sim_horizon is the number of timesteps within one step_k # and step_k is 1-indexed. cur_flow_vars["hour"] = cur_flow_vars["timestep"] + sim_horizon * (step_k - 1) final_columns = ["node_a", "node_b", "value", "type", "timestep", "hour"] return cur_flow_vars[final_columns]
[docs] def parse_syswide_variables( solution: pd.DataFrame, sim_horizon: int, step_k: int ) -> pd.DataFrame: """ The system-wide variables are in the (t) format. Args: solution: The solution DataFrame. sim_horizon: The length of the simulation horizon. step_k: The current simulation period. Returns: pd.DataFrame: The system-wide variables DataFrame """ syswide_var_pattern = r"(.+)\[(\d+)\]" cur_syswide_vars = solution[ solution["varname"].str.match(syswide_var_pattern) ].copy() cur_syswide_vars["timestep"] = cur_syswide_vars["varname"].str.extract( syswide_var_pattern, expand=True )[1] cur_syswide_vars["timestep"] = cur_syswide_vars["timestep"].astype(int) cur_syswide_vars["hour"] = cur_syswide_vars["timestep"] + sim_horizon * (step_k - 1) cur_syswide_vars = cur_syswide_vars.drop("varname", axis=1) return cur_syswide_vars
[docs] def parse_lmp(lmp: dict[str, float], sim_horizon: int, step_k: int) -> pd.DataFrame: """Parse the LMP dictionary and return a DataFrame. Args: lmp: The dictionary of LMP values. sim_horizon: The length of the simulation horizon. step_k: The current simulation period. Returns: pd.DataFrame: The LMP DataFrame. """ lmp_df = pd.DataFrame.from_dict(lmp, orient="index", columns=["value"]) lmp_df = lmp_df.reset_index().rename(columns={"index": "name"}) lmp_df[["node", "timestep"]] = lmp_df["name"].str.extract(r"flowBal\[(.*),(\d+)\]") lmp_df["timestep"] = lmp_df["timestep"].astype(int) lmp_df["hour"] = lmp_df["timestep"] + sim_horizon * (step_k - 1) # Keep only the first 24-hours of the simulation lmp_df = lmp_df[lmp_df["timestep"] <= 24] lmp_df = lmp_df.drop(["name"], axis=1) return lmp_df
[docs] def get_fuel_mix_order() -> list[str]: """Return the order of fuel mix for plotting. We use this list to order the node variables dataframe. Returns list[str]: The order of fuel mix. """ return pd.read_csv( os.path.join(get_database_dir(), "fuels.csv"), header=0, )["name"].tolist()
[docs] def get_fuel_color_map() -> dict: """Return a map of fuel type to its color. This is defined in the database folder.""" fuel_color_map = ( pd.read_csv( os.path.join(get_database_dir(), "fuels.csv"), header=0, usecols=["name", "color"], ) .set_index("name") .to_dict()["color"] ) return fuel_color_map
[docs] def get_lines_params() -> pd.DataFrame: """Return a dataframe of line parameters, located in database/transmission_params.csv""" return pd.read_csv( os.path.join(get_database_dir(), "transmission_params.csv"), header=0, )
[docs] def create_geoseries_columns(df: pd.DataFrame) -> pd.DataFrame: df = df.copy() df["geometry"] = df.apply( lambda row: LineString( [(row["source_lon"], row["source_lat"]), (row["sink_lon"], row["sink_lat"])] ), axis=1, ) df["source_location"] = df.apply( lambda row: Point(row["source_lon"], row["source_lat"]), axis=1 ) df["sink_location"] = df.apply( lambda row: Point(row["sink_lon"], row["sink_lat"]), axis=1 ) # Plotting the substations requires columns to be a GeoSeries df["source_location"] = gpd.GeoSeries(df["source_location"]) df["sink_location"] = gpd.GeoSeries(df["sink_location"]) return df
[docs] def get_capacity_value(t: int, unit: str, step_k: int, capacity_df) -> float: """Get the capacity value for a given unit and timestep. Args: t: The timestep. unit: The unit name. step_k: The current simulation period. capacity_df: The dataframe containing the capacity values. Returns: The capacity value for the given unit and timestep. """ hours_per_timestep = 24 # For rolling horizon value = capacity_df.loc[t + (step_k - 1) * hours_per_timestep, unit] if isinstance(value, pd.Series): return value.iloc[0] return value