"""
model.py: PowerSystemModel is wrapper of gurobipy.Model and highspy.Highs
to provide a more user-friendly interface while compatible with HiGHs solver.
"""
import os
import gurobipy as gp
import highspy
import pandas as pd
from pownet.data_utils import (
get_node_hour_from_flow_constraint,
parse_lmp,
parse_node_variables,
)
from .rounding_algo import optimize_with_rounding
import logging
logger = logging.getLogger(__name__)
[docs]
class PowerSystemModel:
def __init__(self, model: gp.Model):
self.model = model
self.solver: str = "gurobi"
# Rounding related variables
self.status_vars: gp.tupledict = None
# Define dictionaries of functions for Gurobi and HiGHs
self.optimize_functions = {
"gurobi": self._optimize_gurobi,
"highs": self._optimize_highs,
}
self.check_feasible_functions = {
"gurobi": self._check_feasible_gurobi,
"highs": self._check_feasible_highs,
}
self.get_objval_functions = {
"gurobi": self._get_objval_gurobi,
"highs": self._get_objval_highs,
}
self.get_status_functions = {
"gurobi": self._get_status_gurobi,
"highs": self._get_status_highs,
}
self.get_solution_functions = {
"gurobi": self.get_solution_gurobi,
"highs": self.get_solution_highs,
}
self.get_runtime_functions = {
"gurobi": self.get_runtime_gurobi,
"highs": self.get_runtime_highs,
}
self.solve_for_lmp_functions = {
"gurobi": self.solve_for_lmp_gurobi,
"highs": self.solve_for_lmp_highs,
}
self.rounding_optimization_time: float = None
self.rounding_iterations: int = None
[docs]
def write_mps(self, output_folder: str, filename: str):
if not isinstance(self.model, gp.Model):
raise ValueError("The model must be a Gurobi model")
if not os.path.exists(output_folder):
os.makedirs(output_folder)
self.model.write(os.path.join(output_folder, f"{filename}.mps"))
def _optimize_gurobi(
self,
log_to_console: bool,
mipgap: float,
timelimit: int,
num_threads: int,
):
self.model.Params.LogToConsole = log_to_console
self.model.Params.MIPGap = mipgap
self.model.Params.TimeLimit = timelimit
self.model.Params.Threads = num_threads
self.model.optimize()
def _optimize_highs(
self, log_to_console: bool, mipgap: float, timelimit: int, num_threads: int
):
# Export the instance to MPS and solve with HiGHs
mps_file = "temp_instance_for_HiGHs.mps"
self.model.write(mps_file)
self.model = highspy.Highs()
self.model.readModel(mps_file)
self.model.setOptionValue("log_to_console", log_to_console)
self.model.setOptionValue("mip_rel_gap", mipgap)
self.model.setOptionValue("time_limit", timelimit)
self.model.setOptionValue("threads", num_threads)
# Must faster than automatic selection
self.model.setOptionValue("solver", "simplex")
self.model.run()
# Delete the MPS file
os.remove(mps_file)
[docs]
def optimize(
self,
solver: str = "gurobi",
log_to_console: bool = True,
mipgap: float = 1e-3,
timelimit: int = 600,
num_threads: int = 0,
):
if solver not in ["gurobi", "highs"]:
raise ValueError("The solver must be either 'gurobi' or 'highs'")
# Update the solver attribute for referencing in other methods
self.solver = solver
self.optimize_functions[self.solver](
log_to_console=log_to_console,
mipgap=mipgap,
timelimit=timelimit,
num_threads=num_threads,
)
[docs]
def optimize_with_rounding(
self,
rounding_strategy: str,
max_rounding_iter: int,
threshold: float = 0,
mipgap: float = 1e-3,
timelimit: int = 600,
num_threads: int = 0,
log_to_console: bool = False,
) -> None:
self.model, self.rounding_optimization_time, self.rounding_iterations = (
optimize_with_rounding(
model=self.model,
rounding_strategy=rounding_strategy,
threshold=threshold,
max_rounding_iter=max_rounding_iter,
log_to_console=log_to_console,
mipgap=mipgap,
timelimit=timelimit,
num_threads=num_threads,
)
)
def _check_feasible_gurobi(self) -> bool:
not_allowed_statuses = [
gp.GRB.Status.INFEASIBLE,
gp.GRB.Status.INF_OR_UNBD,
gp.GRB.Status.UNBOUNDED,
]
return self.model.status not in not_allowed_statuses
def _check_feasible_highs(self) -> bool:
model_status = self.model.getModelStatus()
return self.model.modelStatusToString(model_status) == "Optimal"
[docs]
def check_feasible(self) -> bool:
return self.check_feasible_functions[self.solver]()
[docs]
def write_ilp_mps(self, output_folder: str, instance_name: str):
# Write the infeasible model to a file
self.model.computeIIS()
ilp_file = os.path.join(
output_folder,
f"infeasible_{instance_name}.ilp",
)
self.model.write(ilp_file)
# Write the infeasible model to a MPS file
self.model.write(ilp_file.replace(".ilp", ".mps"))
def _get_objval_gurobi(self) -> float:
return self.model.objVal
def _get_objval_highs(self) -> float:
info = self.model.getInfo()
return info.objective_function_value
[docs]
def get_objval(self) -> float:
return self.get_objval_functions[self.solver]()
def _get_status_gurobi(self):
return self.model.status
def _get_status_highs(self):
return self.model.getModelStatus()
[docs]
def get_status(self):
return self.get_status_functions[self.solver]()
[docs]
def get_model(self):
return self.model
[docs]
def get_solution_gurobi(self) -> dict:
return {
"varname": self.model.getAttr("varname"),
"value": self.model.getAttr("X"),
}
[docs]
def get_solution_highs(self) -> dict:
return {
"varname": [
self.model.getColName(i)[1]
for i in range(self.model.getNumCol()) # getColName returns a tuple
],
"value": self.model.getSolution().col_value,
}
[docs]
def get_solution(self) -> pd.DataFrame:
return pd.DataFrame(self.get_solution_functions[self.solver]())
[docs]
def get_runtime_gurobi(self) -> float:
return self.model.Runtime
[docs]
def get_runtime_highs(self) -> float:
return self.model.getRunTime()
[docs]
def get_runtime(self) -> float:
return self.get_runtime_functions[self.solver]()
[docs]
def solve_for_lmp_gurobi(self) -> dict:
"""Return the locational marginal price (LMP).
Args:
model: The Gurobi model.
Returns:
The LMP at each node.
"""
# Fix the binary variables
model_fixed = self.model.fixed()
# Reoptimize as linear program, which is already obtained by fixing the binary variables
model_fixed.optimize()
# Get the dual variables
pi = model_fixed.getAttr("Pi", model_fixed.getConstrs())
# Filter to only constraints that are nodal balance, which has 'flowBal' in the name
nodal_price = {
constr.ConstrName: pi[i]
for i, constr in enumerate(model_fixed.getConstrs())
if "flowBal" in constr.ConstrName
}
return nodal_price
[docs]
def solve_for_lmp_highs(self) -> dict:
raise NotImplementedError("This method is not implemented for HiGHs solver")
[docs]
def solve_for_lmp(self) -> dict:
return self.solve_for_lmp_functions[self.solver]()
[docs]
def solve_for_export_capacity(
self, shared_nodes: list, sim_horizon: int, step_k: int
) -> tuple:
"""Return the export capacity and hourly prices at the shared nodes"""
# Fix binary variables to simulate fixing unit commitments
model_fixed = self.model.fixed()
# Add export variables to the model with a negative coefficient (minimization problem)
# The value should be high enough to incentivize export but not high enough to create shortfall
export_vars = model_fixed.addVars(
shared_nodes,
range(1, sim_horizon + 1),
vtype=gp.GRB.CONTINUOUS,
obj=-1, # A small negative value should urge the model to export
name="export",
)
# Add export variables to the flow balance at the shared nodes
for constr in model_fixed.getConstrs():
node, t = get_node_hour_from_flow_constraint(constr.ConstrName)
if (node is not None) and (node in shared_nodes):
model_fixed.chgCoeff(
constr,
export_vars[(node, t)],
-1, # This will add the export variable to the demand on the RHS
)
model_fixed.optimize()
# Extract the export capacity
export_capacity = pd.DataFrame(
{
"varname": [v.varname for v in export_vars.values()],
"value": [v.x for v in export_vars.values()],
}
)
# Format the dataframe with additional columns
export_capacity = parse_node_variables(export_capacity, sim_horizon, step_k)
return export_capacity.pivot(index="hour", columns="node", values="value")
[docs]
def solve_for_export_prices(
self, shared_nodes: list, sim_horizon: int, step_k: int
) -> pd.DataFrame:
"""The export prices are locational marginal prices at the shared nodes."""
export_prices = parse_lmp(
lmp=self.solve_for_lmp(),
sim_horizon=sim_horizon,
step_k=step_k,
)
export_prices = export_prices[export_prices["node"].isin(shared_nodes)]
return export_prices.pivot(index="hour", columns="node", values="value")
[docs]
def print_added_constraints(self):
added_constrs = set()
for attr_name in self.model.getConstrs():
constr_type = attr_name.ConstrName.split("[")[0]
added_constrs.add(constr_type)
# Sort the constraints for better readability
added_constrs = sorted(list(added_constrs))
log_message = "\nAdded constraints:\n"
log_message += "\n".join(added_constrs)
logger.warning(log_message)