Source code for pownet.core.record

"""record.py: The SystemRecord class processes stores the modeling outputs from each iteration.
TODO: self.current_hydro, self.current_import for model coupling
"""

import json
import pandas as pd

from ..input import SystemInput
from pownet.data_utils import (
    parse_node_variables,
    parse_flow_variables,
    parse_syswide_variables,
    parse_lmp,
    calc_remaining_on_duration,
    calc_remaining_off_duration,
    write_df,
)


[docs] class SystemRecord: """This class stores modeling outputs from each iteration. The results are stored in three dataframes: node_vars, flow_vars, and syswide_vars. The initial conditions are also stored in the class. """ def __init__( self, system_input: SystemInput, batch_mode: bool = True, keep_record_each_step: bool = False, ) -> None: """Initialize the SystemRecord object. Args: system_input (SystemInput): The input object containing the simulation parameters. """ self.inputs: SystemInput = system_input self.batch_mode: bool = batch_mode self.keep_record_each_step: bool = keep_record_each_step if not batch_mode and not keep_record_each_step: print( '\nWarning: No data will be stored because both "batch_mode" and "keep_record_each_step" are False.' ) # Format of variable name: var(node, t) self.node_vars: pd.DataFrame = pd.DataFrame() # Format of variable name: flow(node_a, node_b, t) self.flow_vars: pd.DataFrame = pd.DataFrame() # Format of variable name: var(t) self.syswide_vars: pd.DataFrame = pd.DataFrame() # Locational marginal prices (LMP) self.lmp_df: pd.DataFrame = pd.DataFrame() self.objvals: list = [] self.runtimes: list = [] # These are vpower, unit status, unit switching, etc. self.current_p: dict[str] = {} self.current_u: dict[str] = {} self.current_v: dict[str] = {} self.current_w: dict[str] = {} self.current_min_on: dict[str] = {} self.current_min_off: dict[str] = {} self.current_charge_state: dict[str] = {}
[docs] def keep( self, runtime: float, objval: float, solution: pd.DataFrame, step_k: int, lmp: dict[str, float] = None, ) -> None: """Keep the simulation results at the current simulation period step_k. Args: runtime (float): The runtime of the model. objval (float): The objective value of the model. solution (pd.DataFrame): The solution dataframe from the model. step_k (int): The current simulation period. lmp (dict[str, float], optional): The locational marginal prices. Defaults to None. Returns: None """ def _extract_vartype_data(df: pd.DataFrame, vartype: str) -> dict[str, float]: """Extracts data for a specific 'vartype' from the DataFrame and converts it to a dictionary.""" return ( df[ (df["vartype"] == vartype) & (df["timestep"] == 24) ] # Only considers values of the last hour .drop("vartype", axis=1) .set_index(["node"]) # Assume generator names do not repeat .to_dict()["value"] ) self.runtimes.append(runtime) self.objvals.append(objval) # Create a col of variable types for filtering pat_vartype = r"(\w+)\[" solution[["vartype"]] = solution["varname"].str.extract( pat_vartype, expand=True ) node_vars = parse_node_variables(solution, self.inputs.sim_horizon, step_k) # Only keep 24-hours as we are doing rolling horizon node_vars = node_vars[node_vars["timestep"] <= 24] ################## # Initial conditions: vpower (p), commitment (u), # startup (v), shutdown (w), and storage's charge_state ################## self.current_p = _extract_vartype_data(node_vars, "vpower") self.current_u = _extract_vartype_data(node_vars, "status") self.current_v = _extract_vartype_data(node_vars, "startup") self.current_w = _extract_vartype_data(node_vars, "shutdown") self.current_charge_state = _extract_vartype_data(node_vars, "charge_state") # Need to calculate the minimum time on/off self.current_min_on = calc_remaining_on_duration( node_vars, sim_horizon=24, thermal_units=self.inputs.thermal_units, TU=self.inputs.TU, ) self.current_min_off = calc_remaining_off_duration( node_vars, sim_horizon=24, thermal_units=self.inputs.thermal_units, TD=self.inputs.TD, ) ################## # Locational Marginal Prices (LMP) ################## if lmp is not None: self.lmp_df = pd.concat( [ self.lmp_df, parse_lmp(lmp, self.inputs.sim_horizon, step_k).drop( "timestep", axis=1 ), ], axis=0, ) ################## # Append results to the existing dataframes in batch mode # otherwise, write to disk at each step_k if specified ################## # Keep outputs from the first 24-hr under rolling horizon for day-ahead planning node_vars = node_vars[node_vars["timestep"] <= 24] node_vars = node_vars.drop(["varname", "timestep"], axis=1) flow_vars = parse_flow_variables( solution=solution, sim_horizon=self.inputs.sim_horizon, step_k=step_k ) flow_vars = flow_vars[flow_vars["timestep"] <= 24] flow_vars = flow_vars.drop("timestep", axis=1) syswide_vars = parse_syswide_variables( solution=solution, sim_horizon=self.inputs.sim_horizon, step_k=step_k ) syswide_vars = syswide_vars[syswide_vars["timestep"] <= 24] syswide_vars = syswide_vars.drop("timestep", axis=1) if self.batch_mode: self.node_vars = pd.concat([self.node_vars, node_vars], axis=0) self.flow_vars = pd.concat([self.flow_vars, flow_vars], axis=0) self.syswide_vars = pd.concat([self.syswide_vars, syswide_vars], axis=0) if self.keep_record_each_step: output_folder = f"{self.inputs.model_id}_outputs" data_to_write = [ (node_vars, f"node_variables_{step_k}"), (flow_vars, f"flow_variables_{step_k}"), (syswide_vars, f"system_variables_{step_k}"), ( pd.DataFrame({"objval": [objval], "runtime": [runtime]}), f"model_stats_{step_k}", ), ] for df, output_name in data_to_write: write_df( df, output_folder=output_folder, output_name=output_name, model_id=self.inputs.model_id, # Use model_name as the identifier )
[docs] def get_init_conds(self) -> dict[str, dict]: """Return the initial conditions for the simulation.""" return { "initial_p": self.current_p, "initial_u": self.current_u, "initial_v": self.current_v, "initial_w": self.current_w, "initial_min_on": self.current_min_on, "initial_min_off": self.current_min_off, "initial_charge_state": self.current_charge_state, }
[docs] def write_init_conds(self, output_folder: str) -> None: init_conds = self.get_init_conds() with open(f"{output_folder}/ilp_init_conds.json", "w") as f: json.dump(init_conds, f, indent=4)
[docs] def get_node_variables(self) -> pd.DataFrame: """Return node-specific variables. These variables include dispatch, unit status, unit switching, etc. """ return self.node_vars
[docs] def get_flow_variables(self) -> pd.DataFrame: """Return the flow variables.""" return self.flow_vars
[docs] def get_systemwide_variables(self) -> pd.DataFrame: """Return the system variables. We currently only have the system-wide spinning reserve shortfall. """ return self.syswide_vars
[docs] def get_runtimes(self) -> list[float]: """Return the model runtime from each iteration.""" return self.runtimes
[docs] def get_objvals(self) -> list[float]: """Return the objective values from each iteration.""" return self.objvals
[docs] def get_lmp(self) -> pd.DataFrame: """Return the locational marginal prices (LMP) data.""" return self.lmp_df.pivot_table( index="hour", columns="node", values="value", aggfunc="first" )
[docs] def get_model_stats(self) -> pd.DataFrame: return pd.DataFrame({"objval": self.objvals, "runtime": self.runtimes})
[docs] def write_simulation_results(self, output_folder: str) -> None: """ Write CSV files containing modeling results to the output directory. Args: output_folder (str): The directory where the output files will be saved. Returns: None """ if not self.batch_mode: print("No data to write because batch_mode is False.") return data_to_write = [ (self.node_vars, "node_variables"), (self.flow_vars, "flow_variables"), (self.syswide_vars, "system_variables"), ( pd.DataFrame({"objval": self.objvals, "runtime": self.runtimes}), "model_stats", ), ] for df, output_name in data_to_write: write_df( df, output_folder=output_folder, output_name=output_name, model_id=self.inputs.model_id, # Use model_name as the identifier ) # Objective values and runtimes write_df( self.get_model_stats(), output_folder=output_folder, output_name="model_stats", model_id=self.inputs.model_id, ) # LMP data if it exists if not self.lmp_df.empty: write_df( self.lmp_df, output_folder=output_folder, output_name="lmp", model_id=self.inputs.model_id, )