import datetime as dt
import logging
import os
import sys
import numpy as np
import pandas as pd
from ..common import commons
try:
from future.builtins import int
except ImportError:
pass
DEFAULTS = {'ReservoirLevelInitial': 0.5, 'ReservoirLevelFinal': 0.5, 'ValueOfLostLoad': 1E5,
'PriceOfSpillage': 1, 'WaterValue': 100, 'ShareOfQuickStartUnits': 0.5, 'PriceOfAmmonia': 0,
'PriceOfNuclear': 0, 'PriceOfBlackCoal': 0, 'PriceOfGas': 0, 'PriceOfFuelOil': 0, 'PriceOfBiomass': 0,
'PriceOfCO2': 0, 'PriceOfLignite': 0, 'PriceOfPeat': 0, 'LoadShedding': 0, 'CostHeatSlack': 0,
'CostLoadShedding': 100, 'ShareOfFlexibleDemand': 0, 'DemandFlexibility': 0, 'PriceTransmission': 0,
'CostH2Slack': 75}
[docs]def NodeBasedTable(varname, config, default=None):
"""
This function loads the tabular data stored in csv files relative to each zone of the simulation.
:param varname: Variable name (as defined in config)
:param config: Dispa-SET config data
:param default: Default value to be applied if no data is found
:return: Dataframe with the time series for each unit
"""
path = config[varname]
zones = config['zones']
paths = {}
if os.path.isfile(path):
paths['all'] = path
SingleFile = True
elif '##' in path:
for z in zones:
path_c = path.replace('##', str(z))
if os.path.isfile(path_c):
paths[str(z)] = path_c
else:
logging.critical(
'No data file found for the table ' + varname + ' and zone ' + z + '. File ' + path_c +
' does not exist')
sys.exit(1)
SingleFile = False
elif path != '':
logging.critical(
'A path has been specified for table ' + varname + ' (' + path + ') but no file has been found')
sys.exit(1)
data = pd.DataFrame(index=config['idx_long'])
if len(paths) == 0:
logging.info('No data file specified for the table ' + varname + '. Using default value ' + str(default))
if default is None:
pass
elif isinstance(default, (float, int)):
data = pd.DataFrame(default, index=config['idx_long'], columns=zones)
else:
logging.critical('Default value provided for table ' + varname + ' is not valid')
sys.exit(1)
elif SingleFile:
# If it is only one file, there is a header with the zone code
tmp = load_time_series(config, paths['all'])
if len(tmp.columns) == 1: # if there is only one column, assign its value to all the zones, whatever the header
try: # if the column header is numerical, there was probably no header. Load the file again.
float(tmp.columns[0]) # this will fail if the header is not numerical
tmp = load_time_series(config, paths['all'], header=None)
except:
pass
for key in zones:
data[key] = tmp.iloc[:, 0]
else:
for key in zones:
if key in tmp:
# data[key] = tmp[key]
data = pd.concat([data, tmp[key]], axis=1)
else:
logging.error(
'Zone ' + key + ' could not be found in the file ' + path + '. Using default value ' + str(
default))
if default is None:
pass
elif isinstance(default, (float, int)):
data[key] = default
else:
logging.critical('Default value provided for table ' + varname + ' is not valid')
sys.exit(1)
else: # assembling the files in a single dataframe:
for z in paths:
# In case of separated files for each zone, there is no header
tmp = load_time_series(config, paths[z])
data[z] = tmp.iloc[:, 0]
return data
[docs]def UnitBasedTable(plants, varname, config, fallbacks=['Unit'], default=None, RestrictWarning=None):
"""
This function loads the tabular data stored in csv files and assigns the
proper values to each unit of the plants dataframe. If the unit-specific
value is not found in the data, the script can fallback on more generic
data (e.g. fuel-based, technology-based, zone-based) or to the default value.
The order in which the data should be loaded is specified in the fallback
list. For example, ['Unit','Technology'] means that the script will first
try to find a perfect match for the unit name in the data table. If not found,
a column with the unit technology as header is search. If not found, the
default value is assigned.
:param plants: Dataframe with the units for which data is required
:param varname: Variable name (as defined in config)
:param config: Dispa-SET config file
:param fallbacks: List with the order of data source.
:param default: Default value to be applied if no data is found
:param RestrictWarning: Only display the warnings if the unit belongs to the list of technologies provided in
this parameter
:return: Dataframe with the time series for each unit
"""
path = config[varname]
zones = config['zones']
paths = {}
if os.path.isfile(path):
paths['all'] = path
SingleFile = True
elif '##' in path:
for z in zones:
path_c = path.replace('##', str(z))
if os.path.isfile(path_c):
paths[str(z)] = path_c
else:
logging.error(
'No data file found for the table ' + varname + ' and zone ' + z + '. File ' + path_c +
' does not exist')
# sys.exit(1)
SingleFile = False
elif path != '':
logging.critical(
'A path has been specified for table ' + varname + ' (' + path + ') but no file has been found')
sys.exit(1)
data = pd.DataFrame(index=config['idx_long'])
if len(paths) == 0:
logging.info('No data file specified for the table ' + varname + '. Using default value ' + str(default))
if default is None:
out = pd.DataFrame(index=config['idx_long'])
elif isinstance(default, (float, int)):
out = pd.DataFrame(default, index=config['idx_long'], columns=plants['Unit'])
else:
logging.critical('Default value provided for table ' + varname + ' is not valid')
sys.exit(1)
else: # assembling the files in a single dataframe:
columns = []
for z in paths:
tmp = load_time_series(config, paths[z])
if SingleFile:
data = tmp.copy()
else: # use the multi-index header with the zone
for key in tmp:
columns.append((z, key))
tmp.columns = pd.MultiIndex.from_product([[z], tmp.columns])
data = pd.concat([data, tmp], axis=1)
if not SingleFile:
data.columns = pd.MultiIndex.from_tuples(columns, names=['Zone', 'Data'])
# For each plant and each fallback key, try to find the corresponding column in the data
out = pd.DataFrame(index=config['idx_long'])
new_header = []
for j in plants.index:
warning = True
if RestrictWarning is not None:
warning = False
if plants.loc[j, 'Technology'] in RestrictWarning:
warning = True
u = plants.loc[j, 'Unit']
found = False
for i, key in enumerate(fallbacks):
if SingleFile:
header = plants.loc[j, key]
else:
header = (plants.loc[j, 'Zone'], plants.loc[j, key])
if header in data:
if SingleFile:
new_header.append(u)
out = pd.concat([out, data[header]], axis=1)
else:
new_header.append(u)
# out[u] = data[header]
out = pd.concat([out, data[header]], axis=1)
found = True
if i > 0 and warning:
logging.warning(
'No specific information was found for unit ' + u + ' in table ' + varname +
'. The generic information for ' + str(header) + ' has been used')
break
if not found:
if warning:
logging.info(
'No specific information was found for unit ' + u + ' in table ' + varname +
'. Using default value ' + str(default))
if default is not None:
# out[u] = default
out = pd.concat([out, pd.DataFrame(index=out.index, columns=[u]).fillna(default)], axis=1)
new_header.append(u)
out.columns = new_header
if not out.columns.is_unique:
logging.critical(
'The column headers of table "' + varname + '" are not unique!. The following headers are duplicated: ' +
str(out.columns.get_duplicates()))
sys.exit(1)
return out
[docs]def GenericTable(headers, varname, config, default=None):
"""
This function loads the tabular data stored in csv files and assigns the
proper values to each pre-specified column. If not found, the
default value is assigned.
:param headers: List with the column headers to be read
:param varname: Variable to be read
:param config: Config variable
:param default: Default value to be applied if no data is found
:return: Dataframe with the time series for each unit
"""
path = config[varname]
paths = {}
if os.path.isfile(path):
paths['all'] = path
SingleFile = True
elif '##' in path:
logging.critical('The table provided for variable ' + varname + 'Must be a single file')
sys.exit(1)
elif path != '':
logging.critical(
'A path has been specified for table ' + varname + ' (' + path + ') but no file has been found')
sys.exit(1)
data = pd.DataFrame(index=config['idx_long'])
if len(paths) == 0:
logging.info('No data file specified for the table ' + varname + '. Using default value ' + str(default))
if default is None:
out = pd.DataFrame(index=config['idx_long'])
elif isinstance(default, (float, int)):
out = pd.DataFrame(default, index=config['idx_long'], columns=headers)
else:
logging.critical('Default value provided for table ' + varname + ' is not valid')
sys.exit(1)
else: # assembling the files in a single dataframe:
data = load_time_series(config, paths['all'])
# For each plant and each fallback key, try to find the corresponding column in the data
out = pd.DataFrame(index=config['idx_long'])
for header in headers:
if header in data:
out[header] = data[header]
else:
logging.info('No specific information was found for header ' + header + ' in table ' + varname +
'. Using default value ' + str(default))
if not out.columns.is_unique:
logging.critical('The column headers of table "' + varname +
'" are not unique!. The following headers are duplicated: ' +
str(out.columns.get_duplicates()))
sys.exit(1)
return out
[docs]def merge_series(plants, oldplants, data, method='WeightedAverage', tablename=''):
"""
Function that merges the times series corresponding to the merged units (e.g. outages, inflows, etc.)
:param plants: Pandas dataframe with final units after clustering (must contain 'FormerUnits')
:param oldplants: Pandas dataframe with the original units
:param data: Pandas dataframe with the time series and the original unit names as column header
:param method: Select the merging method ('WeightedAverage'/'Sum')
:param tablename: Name of the table being processed (e.g. 'Outages'), used in the warnings
:return merged: Pandas dataframe with the merged time series when necessary
"""
# backward compatibility:
if not "Nunits" in plants:
plants['Nunits'] = 1
if not 'FormerUnits' in plants:
logging.critical('The unit table provided must contain the columns "FormerUnits"')
sys.exit(1)
merged = pd.DataFrame(index=data.index)
# Create a dictionary relating the former units to the new (clustered) ones:
units = {}
for u in plants.index:
for uu in plants.loc[u, 'FormerUnits']:
units[uu] = u
# First check the data:
if not isinstance(data, pd.DataFrame):
logging.critical('The input "' + tablename + '" to the merge_series function must be a dataframe')
sys.exit(1)
for key in data:
if str(data[key].dtype) not in ['bool', 'int', 'float', 'float16', 'float32', 'float64', 'float128', 'int8',
'int16', 'int32', 'int64']:
logging.critical('The column "' + str(key) + '" of table + "' + tablename + '" is not numeric!')
for key in data:
if key in units:
newunit = units[key]
if newunit not in merged:
# if the columns name is in the mapping and the new unit has not been processed yet
oldnames = plants.loc[newunit, 'FormerUnits']
if all([name in data for name in oldnames]):
subunits = data[oldnames]
else:
for name in oldnames:
if name not in data:
logging.critical(
'The column "' + name + '" is required for the aggregation of unit "' + key +
'", but it has not been found in the input data')
sys.exit(1)
value = np.zeros(len(data))
# Renaming the subunits df headers with the old plant indexes instead of the unit names:
if method == 'WeightedAverage':
for name in oldnames:
value = value + subunits[name] * np.maximum(1e-9, oldplants['PowerCapacity'][name] *
oldplants['Nunits'][name])
P_j = np.sum(np.maximum(1e-9, oldplants['PowerCapacity'][oldnames] * oldplants['Nunits'][oldnames]))
# merged[newunit] = value / P_j
new_capacity = pd.DataFrame(value / P_j)
new_capacity.columns = [newunit]
merged = pd.concat([merged, new_capacity], axis=1)
elif method == 'StorageWeightedAverage':
for name in oldnames:
value = value + subunits[name] * np.maximum(1e-9, oldplants['STOCapacity'][name] *
oldplants['Nunits'][name])
P_j = np.sum(np.maximum(1e-9, oldplants['STOCapacity'][oldnames] * oldplants['Nunits'][oldnames]))
# merged[newunit] = value / P_j
new_capacity = pd.DataFrame(value / P_j)
new_capacity.columns = [newunit]
merged = pd.concat([merged, new_capacity], axis=1)
elif method == 'Sum':
# merged[newunit] = subunits.sum(axis=1)
new_capacity = subunits.sum(axis=1)
new_capacity.columns = [newunit]
merged = pd.concat([merged, new_capacity], axis=1)
else:
logging.critical('Method "' + str(method) + '" unknown in function MergeSeries')
sys.exit(1)
elif key in oldplants['Unit']:
if not isinstance(key, tuple):
# if the columns header is a tuple, it does not come from the data and has been added by Dispa-SET
logging.warning('Column ' + str(key) + ' present in the table "' + tablename +
'" not found in the mapping between original and clustered units. Skipping')
else:
if not isinstance(key, tuple):
# if the columns header is a tuple, it does not come from the data and has been added by Dispa-SET
logging.warning('Column ' + str(key) + ' present in the table "' + tablename +
'" not found in the table of power plants. Skipping')
return merged
[docs]def define_parameter(sets_in, sets, value=0):
"""
Function to define a DispaSET parameter and fill it with a constant value
:param sets_in: List with the labels of the sets corresponding to the parameter
:param sets: dictionary containing the definition of all the sets (must comprise those referenced in sets_in)
:param value: Default value to attribute to the parameter
"""
if value == 'bool':
values = np.zeros([len(sets[setx]) for setx in sets_in], dtype='bool')
elif value == 0:
values = np.zeros([len(sets[setx]) for setx in sets_in])
elif value == 1:
values = np.ones([len(sets[setx]) for setx in sets_in])
else:
values = np.ones([len(sets[setx]) for setx in sets_in]) * value
return {'sets': sets_in, 'val': values}
[docs]def load_time_series(config, path, header='infer'):
"""
Function that loads time series data, checks the compatibility of the indexes
and guesses when no exact match between the required index and the data is
present
:param: config dispaset config
:param: path path towards the desired timeseries
:param: header list of header names
:return: reindexed timeseries
"""
data = pd.read_csv(path, index_col=0, parse_dates=True, header=header, keep_default_na=False)
if not data.index.is_unique:
logging.critical('The index of data file ' + path + ' is not unique. Please check the data')
sys.exit(1)
if not data.index.is_monotonic_increasing:
logging.error('The index of data file ' + path + ' is not monotonously increasing. '
'Trying to check if it can be parsed with a "day first" format ')
data = pd.read_csv(path, index_col=0, parse_dates=True, header=header, dayfirst=True, keep_default_na=False)
if not data.index.is_monotonic_increasing:
logging.critical('Could not parse index of ' + path + '. To avoid problems make sure that '
'you use the proper american date format (yyyy-mm-dd hh:mm:ss)')
sys.exit(1)
# First convert numerical indexes into datetimeindex:
if data.index.is_numeric():
if len(data) == len(config['idx']): # The data has the same length as the provided index range
logging.info('A numerical index has been found for file ' + path +
'. It has the same length as the target simulation and is therefore considered valid')
data.index = config['idx']
elif len(data) == 8760:
logging.info('A numerical index has been found for file ' + path +
'. Since it contains 8760 elements, it is assumed that it corresponds to a whole year')
data.index = pd.date_range(start=dt.datetime(*(config['idx'][0].year, 1, 1, 0, 0)),
end=dt.datetime(*(config['idx'][0].year, 12, 31, 23, 59, 59)),
freq=commons['TimeStep'])
else:
logging.critical('A numerical index has been found for file ' + path + '. However, its length does not '
'allow guessing its timestamps. Please use a 8760 elements time series')
sys.exit(1)
if data.index.inferred_type == 'datetime64':
data.index = data.index.tz_localize(None) # removing locational data
main_year = data.groupby(data.index.year).size()
year = int(main_year[main_year >= 8759].index.values)
data = data[data.index.year == year]
# Checking if the required index entries are in the data:
common = data.index.intersection(config['idx'])
if len(common) == 0:
# check if original year is leap year and destination year is not (remove leap date)
if (data.index[0].is_leap_year is True) and (config['idx'][0].is_leap_year is False):
data = data[~((data.index.month == 2) & (data.index.day == 29))]
logging.warning('File ' + path + ': data for year ' + str(data.index[0].year) +
' is used instead of year ' + str(config['idx'][0].year))
data.index = data.index.map(lambda t: t.replace(year=config['idx'][0].year))
# check if both years are either leap or non leap
elif (data.index[0].is_leap_year is True) and (config['idx'][0].is_leap_year is True) or \
(data.index[0].is_leap_year is False) and (config['idx'][0].is_leap_year is False):
logging.warning('File ' + path + ': data for year ' + str(data.index[0].year) +
' is used instead of year ' + str(config['idx'][0].year) +
'. Leap year date is removed from the original DataFrame.')
data.index = data.index.map(lambda t: t.replace(year=config['idx'][0].year))
# check if original year is not a leap year and destination year is a leap year
# (add leap date and take average hourly values between 28.02. and 1.3.
elif (data.index[0].is_leap_year is False) and (config['idx'][0].is_leap_year is True):
logging.warning('File ' + path + ': data for year ' + str(data.index[0].year) +
' is used instead of year ' + str(config['idx'][0].year) +
'. Leap year date is interpolated between the two neighbouring days.')
data.index = data.index.map(lambda t: t.replace(year=config['idx'][0].year))
mask = data.loc[str(config['idx'][0].year) + '-2-28': str(config['idx'][0].year) + '-3-1']
mask = mask.groupby(mask.index.hour).mean()
time = pd.date_range(str(config['idx'][0].year) + '-2-29', periods=24, freq='H')
mask = mask.set_index(time)
data = data[~data.index.duplicated(keep='last')]
data = data.reindex(config['idx'])
data.update(mask)
# recompute common index entries, and check again:
common = data.index.intersection(config['idx'])
if len(common) < len(config['idx']) - 1:
logging.critical('File ' + path + ': the index does not contain the necessary time range (from ' + str(
config['idx'][0]) + ' to ' + str(config['idx'][-1]) + ')')
sys.exit(1)
elif len(common) == len(config['idx']) - 1: # there is only one data point missing. This is deemed acceptable
logging.warning('File ' + path + ': there is one data point missing in the time series. '
'It will be filled with the nearest data')
else:
pass # the defined simulation index is found within the data. No action required
else:
logging.critical('Index for file ' + path + ' is not valid')
sys.exit(1)
# re-indexing with the longer index (including look-ahead) and filling possibly missing data at the beginning and
# at the end:
return data.reindex(config['idx_long'], method='nearest').fillna(method='bfill')
[docs]def load_config(ConfigFile, AbsPath=True):
"""
Wrapper function around load_config_excel and load_config_yaml
"""
if ConfigFile.endswith(('.xlsx', '.xls')):
config = load_config_excel(ConfigFile, AbsPath=True)
elif ConfigFile.endswith(('.yml', '.yaml')):
config = load_config_yaml(ConfigFile, AbsPath=True)
else:
logging.critical('The extension of the config file should be .xlsx or .yml')
sys.exit(1)
return config
[docs]def read_truefalse(sheet, rowstart, colstart, rowstop, colstop, colapart=1):
"""
Function that reads a two column format with a list of strings in the first
columns and a list of true false in the second column
The list of strings associated with a True value is returned
"""
out = []
for i in range(rowstart, rowstop + 1):
if sheet.cell_value(i, colstart + colapart) == 1:
out.append(sheet.cell_value(i, colstart))
return out
[docs]def read_Participation(sheet, rowstart, colstart, rowstop, colapart=1):
"""
Creates dict for each technology and add 0 for false and 1 for true (first value for without CHP second with CHP)
:param sheet: Excel sheet to load data from
:param rowstart: Row to start reading the data
:param colstart: Column to start reading the data
:param rowstop: Row to stop reading the data
:param colapart: Columns apart to read the data
:return:
"""
Reserveparticipation = {}
for i in range(rowstart, rowstop):
Reserveparticipation[sheet.cell_value(i, colstart)] = [sheet.cell_value(i, colstart + colapart),
sheet.cell_value(i, colstart + colapart + 1)]
return Reserveparticipation
[docs]def load_config_excel(ConfigFile, AbsPath=True):
"""
Function that loads the DispaSET excel config file and returns a dictionary
with the values
:param ConfigFile: String with (relative) path to the DispaSET excel configuration file
:param AbsPath: If true, relative paths are automatically changed into absolute paths (recommended)
"""
import xlrd
xlrd.xlsx.ensure_elementtree_imported(False, None)
xlrd.xlsx.Element_has_iter = True
wb = xlrd.open_workbook(filename=ConfigFile) # Option for csv to be added later
sheet = wb.sheet_by_name('main')
config = {}
if sheet.cell_value(0, 0) == 'Dispa-SET Configuration File (v20.01)':
config['Description'] = sheet.cell_value(5, 1)
config['StartDate'] = xlrd.xldate_as_tuple(sheet.cell_value(56, 2), wb.datemode)
config['StopDate'] = xlrd.xldate_as_tuple(sheet.cell_value(57, 2), wb.datemode)
config['HorizonLength'] = int(sheet.cell_value(58, 2))
config['LookAhead'] = int(sheet.cell_value(59, 2))
# Defning the input locations in the config file:
StdParameters = {'SimulationDirectory': 33, 'WriteGDX': 34, 'WritePickle': 35, 'GAMS_folder': 36,
'cplex_path': 37, 'DataTimeStep': 60, 'SimulationTimeStep': 61,
'SimulationType': 76, 'ReserveCalculation': 77, 'AllowCurtailment': 78,
'HydroScheduling': 98, 'HydroSchedulingHorizon': 99, 'InitialFinalReservoirLevel': 100}
PathParameters = {'Demand': 124, 'Outages': 126, 'PowerPlantData': 127, 'RenewablesAF': 128,
'LoadShedding': 129, 'NTC': 130, 'Interconnections': 131, 'ReservoirScaledInflows': 132,
'PriceOfNuclear': 180, 'PriceOfBlackCoal': 181, 'PriceOfGas': 182,
'PriceOfFuelOil': 183, 'PriceOfBiomass': 184, 'PriceOfCO2': 166,
'ReservoirLevels': 133, 'PriceOfLignite': 185, 'PriceOfPeat': 186,
'HeatDemand': 134, 'CostHeatSlack': 165, 'CostLoadShedding': 168,
'ShareOfFlexibleDemand': 125,
'Temperatures': 135, 'PriceTransmission': 169, 'Reserve2U': 160, 'Reserve2D': 161,
'H2Demand': 136, 'CostH2Slack': 170}
modifiers = {'Demand': 274, 'Wind': 275, 'Solar': 276, 'Storage': 277}
default = {'ReservoirLevelInitial': 101, 'ReservoirLevelFinal': 102, 'PriceOfNuclear': 180,
'PriceOfBlackCoal': 181,
'PriceOfGas': 182, 'PriceOfFuelOil': 183, 'PriceOfBiomass': 184, 'PriceOfCO2': 166,
'PriceOfLignite': 185,
'PriceOfPeat': 186, 'LoadShedding': 129, 'CostHeatSlack': 167, 'CostLoadShedding': 168,
'ValueOfLostLoad': 204,
'PriceOfSpillage': 205, 'WaterValue': 206, 'ShareOfQuickStartUnits': 163,
'ShareOfFlexibleDemand': 125,
'DemandFlexibility': 162, 'PriceTransmission': 169, 'CostH2Slack': 170}
for p in StdParameters:
config[p] = sheet.cell_value(StdParameters[p], 2)
for p in PathParameters:
config[p] = sheet.cell_value(PathParameters[p], 2)
config['modifiers'] = {}
for p in modifiers:
config['modifiers'][p] = sheet.cell_value(modifiers[p], 2)
config['default'] = {}
for p in default:
config['default'][p] = sheet.cell_value(default[p], 5)
# True/Falst values:
config['zones'] = read_truefalse(sheet, 225, 1, 247, 3)
config['zones'] = config['zones'] + read_truefalse(sheet, 225, 4, 247, 6)
config['zones'] = config['zones'] + read_truefalse(sheet, 225, 7, 247, 9) #MARCO NAVIA
config['zones'] = config['zones'] + read_truefalse(sheet, 225, 10, 247, 12) #MARCO NAVIA
config['zones'] = config['zones'] + read_truefalse(sheet, 225, 13, 247, 15) #MARCO NAVIA
config['mts_zones'] = read_truefalse(sheet, 225, 1, 247, 3, 2)
config['mts_zones'] = config['mts_zones'] + read_truefalse(sheet, 225, 4, 247, 6, 2)
config['mts_zones'] = config['mts_zones'] + read_truefalse(sheet, 225, 7, 247, 9, 2) #MARCO NAVIA
config['mts_zones'] = config['mts_zones'] + read_truefalse(sheet, 225, 10, 247, 12, 2) #MARCO NAVIA
config['mts_zones'] = config['mts_zones'] + read_truefalse(sheet, 225, 13, 247, 15, 2) #MARCO NAVIA
config['ReserveParticipation'] = read_truefalse(sheet, 305, 1, 321, 3)
config['ReserveParticipation_CHP'] = read_truefalse(sheet, 342, 1, 345, 3)
# Set default values (for backward compatibility):
for param in DEFAULTS:
if config['default'][param] == '':
config['default'][param] = DEFAULTS[param]
logging.warning(
'No value was provided in config file for {}. Will use {}'.format(param, DEFAULTS[param]))
config['default'][param] = DEFAULTS[param]
if AbsPath:
# Changing all relative paths to absolute paths. Relative paths must be defined
# relative to the parent folder of the config file.
abspath = os.path.abspath(ConfigFile)
basefolder = os.path.abspath(os.path.join(os.path.dirname(abspath), os.pardir))
if not os.path.isabs(config['SimulationDirectory']):
config['SimulationDirectory'] = os.path.join(basefolder, config['SimulationDirectory'])
for param in PathParameters:
if config[param] == '' or config[param].isspace():
config[param] = ''
elif not os.path.isabs(config[param]):
config[param] = os.path.join(basefolder, config[param])
logging.info("Using config file (v20.01) " + ConfigFile + " to build the simulation environment")
logging.info("Using " + config['SimulationDirectory'] + " as simulation folder")
logging.info("Description of the simulation: " + config['Description'])
return config
elif sheet.cell_value(0, 0) == 'Dispa-SET Configuration File (v20.02)':
config['Description'] = sheet.cell_value(5, 1)
config['StartDate'] = xlrd.xldate_as_tuple(sheet.cell_value(56, 2), wb.datemode)
config['StopDate'] = xlrd.xldate_as_tuple(sheet.cell_value(57, 2), wb.datemode)
config['HorizonLength'] = int(sheet.cell_value(58, 2))
config['LookAhead'] = int(sheet.cell_value(59, 2))
# Defning the input locations in the config file:
StdParameters = {
# Scenario options
'SimulationDirectory': 33, 'WriteGDX': 34, 'WritePickle': 35, 'GAMS_folder': 36,
'cplex_path': 37,
# Horizon Settings
'DataTimeStep': 60, 'SimulationTimeStep': 61,
# Simulation Options
'SimulationType': 76, 'ReserveCalculation': 77, 'AllowCurtailment': 78,
# Mid-term scheduling related
'HydroScheduling': 98, 'HydroSchedulingHorizon': 99, 'InitialFinalReservoirLevel': 100
}
PathParameters = {
# Power system data
'Demand': 124, 'ShareOfFlexibleDemand': 125, 'Outages': 126, 'PowerPlantData': 127,
'RenewablesAF': 128, 'LoadShedding': 129,
# Interconnection data
'NTC': 130, 'Interconnections': 131,
# Hydro data
'ReservoirScaledInflows': 132, 'ReservoirLevels': 133,
# Heat data
'HeatDemand': 134, 'Temperatures': 135,
# Geo data
'GeoData': 136,
# Hydrogen data
'H2RigidDemand': 137, 'H2FlexibleDemand': 138, 'H2FlexibleCapacity': 139,
# Reserves input data
'Reserve2U': 160, 'Reserve2D': 161,
# Other costs related data
'PriceOfCO2': 166, 'CostHeatSlack': 167, 'CostLoadShedding': 168, 'PriceTransmission': 169,
'CostH2Slack': 170, 'CostCurtailment': 171,
# Fuel price related data
'PriceOfNuclear': 180, 'PriceOfBlackCoal': 181, 'PriceOfGas': 182, 'PriceOfFuelOil': 183,
'PriceOfBiomass': 184, 'PriceOfLignite': 185, 'PriceOfPeat': 186, 'PriceOfAmmonia': 187
}
modifiers = {'Demand': 274, 'Wind': 275, 'Solar': 276, 'Storage': 277}
default = {
# Hydro scheduling defaults
'ReservoirLevelInitial': 101, 'ReservoirLevelFinal': 102,
# Fuel price defaults
'PriceOfNuclear': 180, 'PriceOfBlackCoal': 181, 'PriceOfGas': 182, 'PriceOfFuelOil': 183,
'PriceOfBiomass': 184, 'PriceOfLignite': 185, 'PriceOfPeat': 186, 'PriceOfAmmonia': 187,
# Other price defaults
'PriceOfCO2': 166, 'CostHeatSlack': 167, 'CostLoadShedding': 168, 'PriceTransmission': 169,
'CostH2Slack': 170, 'CostCurtailment': 171,
# Optimization and infeasibility cost data
'ShareOfFlexibleDemand': 125, 'LoadShedding': 129,
'DemandFlexibility': 162, 'ShareOfQuickStartUnits': 163,
'ValueOfLostLoad': 204, 'PriceOfSpillage': 205, 'WaterValue': 206
}
for p in StdParameters:
config[p] = sheet.cell_value(StdParameters[p], 2)
for p in PathParameters:
config[p] = sheet.cell_value(PathParameters[p], 2)
config['modifiers'] = {}
for p in modifiers:
config['modifiers'][p] = sheet.cell_value(modifiers[p], 2)
config['default'] = {}
for p in default:
config['default'][p] = sheet.cell_value(default[p], 5)
# True/Falst values:
config['zones'] = read_truefalse(sheet, 225, 1, 250, 3)
config['zones'] = config['zones'] + read_truefalse(sheet, 225, 4, 250, 6)
if sheet.cell_value(225, 7) != '':
config['zones'] = config['zones'] + read_truefalse(sheet, 225, 7, 247, 9)
config['zones'] = config['zones'] + read_truefalse(sheet, 225, 10, 247, 12)
config['zones'] = config['zones'] + read_truefalse(sheet, 225, 13, 247, 15)
config['mts_zones'] = read_truefalse(sheet, 225, 1, 250, 3, 2)
config['mts_zones'] = config['mts_zones'] + read_truefalse(sheet, 225, 4, 250, 6, 2)
if sheet.cell_value(225, 7) != '':
config['mts_zones'] = config['mts_zones'] + read_truefalse(sheet, 225, 7, 247, 9, 2)
config['mts_zones'] = config['mts_zones'] + read_truefalse(sheet, 225, 10, 247, 12, 2)
config['mts_zones'] = config['mts_zones'] + read_truefalse(sheet, 225, 13, 247, 15, 2)
config['ReserveParticipation'] = read_truefalse(sheet, 305, 1, 321, 3)
config['ReserveParticipation'] = config['ReserveParticipation'] + read_truefalse(sheet, 305, 4, 321, 6)
config['ReserveParticipation_CHP'] = read_truefalse(sheet, 299, 1, 302, 3)
# Set default values (for backward compatibility):
for param in DEFAULTS:
if config['default'][param] == '':
config['default'][param] = DEFAULTS[param]
logging.warning(
'No value was provided in config file for {}. Will use {}'.format(param, DEFAULTS[param]))
config['default'][param] = DEFAULTS[param]
if AbsPath:
# Changing all relative paths to absolute paths. Relative paths must be defined
# relative to the parent folder of the config file.
abspath = os.path.abspath(ConfigFile)
basefolder = os.path.abspath(os.path.join(os.path.dirname(abspath), os.pardir))
if not os.path.isabs(config['SimulationDirectory']):
config['SimulationDirectory'] = os.path.join(basefolder, config['SimulationDirectory'])
for param in PathParameters:
if config[param] == '' or config[param].isspace():
config[param] = ''
elif not os.path.isabs(config[param]):
config[param] = os.path.join(basefolder, config[param])
logging.info("Using config file (v20.02) " + ConfigFile + " to build the simulation environment")
logging.info("Using " + config['SimulationDirectory'] + " as simulation folder")
logging.info("Description of the simulation: " + config['Description'])
return config
elif sheet.cell_value(0, 0) == 'Dispa-SET Configuration File':
config['Description'] = sheet.cell_value(5, 1)
config['SimulationDirectory'] = sheet.cell_value(17, 2)
config['WriteExcel'] = sheet.cell_value(18, 2)
config['WriteGDX'] = sheet.cell_value(19, 2)
config['WritePickle'] = sheet.cell_value(20, 2)
config['GAMS_folder'] = sheet.cell_value(21, 2)
config['cplex_path'] = sheet.cell_value(22, 2)
config['StartDate'] = xlrd.xldate_as_tuple(sheet.cell_value(30, 2), wb.datemode)
config['StopDate'] = xlrd.xldate_as_tuple(sheet.cell_value(31, 2), wb.datemode)
config['HorizonLength'] = int(sheet.cell_value(32, 2))
config['LookAhead'] = int(sheet.cell_value(33, 2))
config['DataTimeStep'] = sheet.cell_value(34, 2)
config['SimulationTimeStep'] = sheet.cell_value(35, 2)
config['SimulationType'] = sheet.cell_value(46, 2)
config['ReserveCalculation'] = sheet.cell_value(47, 2)
config['AllowCurtailment'] = sheet.cell_value(48, 2)
config['HydroScheduling'] = sheet.cell_value(53, 2)
config['HydroSchedulingHorizon'] = sheet.cell_value(54, 2)
config['InitialFinalReservoirLevel'] = sheet.cell_value(55, 2)
# Set default values (for backward compatibility):
NonEmptyarameters = {'DataTimeStep': 1, 'SimulationTimeStep': 1, 'HydroScheduling': 'Off',
'HydroSchedulingHorizon': 'Annual', 'InitialFinalReservoirLevel': True}
for param in NonEmptyarameters:
if config[param] == '':
config[param] = NonEmptyarameters[param]
# List of parameters for which an external file path must be specified:
PARAMS = ['Demand', 'Outages', 'PowerPlantData', 'RenewablesAF', 'LoadShedding', 'NTC', 'Interconnections',
'ReservoirScaledInflows', 'PriceOfNuclear', 'PriceOfBlackCoal', 'PriceOfGas', 'PriceOfFuelOil',
'PriceOfBiomass', 'PriceOfCO2', 'ReservoirLevels', 'PriceOfLignite', 'PriceOfPeat', 'HeatDemand',
'CostHeatSlack', 'CostLoadShedding', 'ShareOfFlexibleDemand']
for i, param in enumerate(PARAMS):
config[param] = sheet.cell_value(61 + i, 2)
# List of new parameters for which an external file path must be specified:
params2 = ['Temperatures', 'PriceTransmission', 'Reserve2D', 'Reserve2U', 'H2Demand', 'CostH2Slack', 'GeoData']
if sheet.nrows > 150: # for backward compatibility (old excel sheets had less than 150 rows)
for i, param in enumerate(params2):
config[param] = sheet.cell_value(156 + i, 2)
else:
for param in params2:
config[param] = ''
if AbsPath:
# Changing all relative paths to absolute paths. Relative paths must be defined
# relative to the parent folder of the config file.
abspath = os.path.abspath(ConfigFile)
basefolder = os.path.abspath(os.path.join(os.path.dirname(abspath), os.pardir))
if not os.path.isabs(config['SimulationDirectory']):
config['SimulationDirectory'] = os.path.join(basefolder, config['SimulationDirectory'])
for param in PARAMS + params2:
if config[param] == '' or config[param].isspace():
config[param] = ''
elif not os.path.isabs(config[param]):
config[param] = os.path.join(basefolder, config[param])
config['default'] = {}
config['default']['ReservoirLevelInitial'] = sheet.cell_value(56, 5)
config['default']['ReservoirLevelFinal'] = sheet.cell_value(57, 5)
config['default']['PriceOfNuclear'] = sheet.cell_value(69, 5)
config['default']['PriceOfBlackCoal'] = sheet.cell_value(70, 5)
config['default']['PriceOfGas'] = sheet.cell_value(71, 5)
config['default']['PriceOfFuelOil'] = sheet.cell_value(72, 5)
config['default']['PriceOfBiomass'] = sheet.cell_value(73, 5)
config['default']['PriceOfCO2'] = sheet.cell_value(74, 5)
config['default']['PriceOfLignite'] = sheet.cell_value(76, 5)
config['default']['PriceOfPeat'] = sheet.cell_value(77, 5)
config['default']['LoadShedding'] = sheet.cell_value(65, 5)
config['default']['CostHeatSlack'] = sheet.cell_value(79, 5)
config['default']['CostLoadShedding'] = sheet.cell_value(80, 5)
config['default']['ValueOfLostLoad'] = sheet.cell_value(81, 5)
config['default']['PriceOfSpillage'] = sheet.cell_value(82, 5)
config['default']['WaterValue'] = sheet.cell_value(83, 5)
config['default']['ShareOfQuickStartUnits'] = 0.5 # to be added to xlsx file
# Set default values (for backward compatibility):
for param in DEFAULTS:
if config['default'].get(param, '') == '':
config['default'][param] = DEFAULTS[param]
config['zones'] = read_truefalse(sheet, 86, 1, 109, 3)
config['zones'] = config['zones'] + read_truefalse(sheet, 86, 4, 109, 6)
config['mts_zones'] = read_truefalse(sheet, 86, 1, 109, 3, 2)
config['mts_zones'] = config['mts_zones'] + read_truefalse(sheet, 86, 4, 109, 6, 2)
config['modifiers'] = {}
config['modifiers']['Demand'] = sheet.cell_value(111, 2)
config['modifiers']['Wind'] = sheet.cell_value(112, 2)
config['modifiers']['Solar'] = sheet.cell_value(113, 2)
config['modifiers']['Storage'] = sheet.cell_value(114, 2)
# Read the technologies participating to reserve markets:
config['ReserveParticipation'] = read_truefalse(sheet, 131, 1, 145, 3)
config['ReserveParticipation_CHP'] = []
logging.info("Using config file " + ConfigFile + " to build the simulation environment")
logging.info("Using " + config['SimulationDirectory'] + " as simulation folder")
logging.info("Description of the simulation: " + config['Description'])
return config
else:
logging.critical('The format of the excel config file (defined by its main title) is not recognized')
sys.exit(1)
[docs]def load_config_yaml(filename, AbsPath=True):
""" Loads YAML file to dictionary"""
import yaml
with open(filename, 'r') as f:
try:
config = yaml.full_load(f)
except yaml.YAMLError as exc:
logging.error('Cannot parse config file: {}'.format(filename))
raise exc
# List of parameters to be added with a default value if not present (for backward compatibility):
params_to_be_added = {'Temperatures': '', 'DataTimeStep': 1, 'SimulationTimeStep': 1, 'HydroScheduling': 'Off',
'HydroSchedulingHorizon': 'Annual', 'InitialFinalReservoirLevel': True,
'ReserveParticipation_CHP': []}
for param in params_to_be_added:
if param not in config:
config[param] = params_to_be_added[param]
# Set default values (for backward compatibility):
NonEmptyDefaultss = {'ReservoirLevelInitial': 0.5, 'ReservoirLevelFinal': 0.5, 'ValueOfLostLoad': 1E5,
'PriceOfSpillage': 1, 'WaterValue': 100, 'ShareOfQuickStartUnits': 0.5}
for param in NonEmptyDefaultss:
if param not in config['default']:
config['default'][param] = NonEmptyDefaultss[param]
# Define missing parameters if they were not provided in the config file
PARAMS = ['Demand', 'Outages', 'PowerPlantData', 'RenewablesAF', 'LoadShedding', 'NTC', 'Interconnections',
'ReservoirScaledInflows', 'PriceOfNuclear', 'PriceOfBlackCoal', 'PriceOfGas', 'PriceOfFuelOil',
'PriceOfBiomass', 'PriceOfCO2', 'ReservoirLevels', 'PriceOfLignite', 'PriceOfPeat', 'PriceOfAmmonia',
'HeatDemand', 'CostHeatSlack', 'CostLoadShedding', 'ShareOfFlexibleDemand', 'Temperatures',
'PriceTransmission', 'Reserve2D', 'Reserve2U', 'H2RigidDemand', 'H2FlexibleDemand', 'H2FlexibleCapacity',
'CostH2Slack', 'GeoData']
for param in PARAMS:
if param not in config:
config[param] = ''
global DEFAULTS
for key in DEFAULTS:
if key not in config['default']:
config['default'][key] = DEFAULTS[key]
if AbsPath:
# Changing all relative paths to absolute paths. Relative paths must be defined
# relative to the parent folder of the config file.
abspath = os.path.abspath(filename)
basefolder = os.path.abspath(os.path.join(os.path.dirname(abspath), os.pardir))
if not os.path.isabs(config['SimulationDirectory']):
config['SimulationDirectory'] = os.path.join(basefolder, config['SimulationDirectory'])
for param in PARAMS:
if not os.path.isabs(config[param]):
if config[param] == '' or config[param].isspace():
config[param] = ''
elif not os.path.isabs(config[param]):
config[param] = os.path.join(basefolder, config[param])
return config
[docs]def export_yaml_config(ExcelFile, YAMLFile):
"""
Function that loads the DispaSET excel config file and dumps it as a yaml file.
:param ExcelFile: Path to the Excel config file
:param YAMLFile: Path to the YAML config file to be written
"""
import yaml
config = load_config_excel(ExcelFile, AbsPath=False)
with open(YAMLFile, 'w') as outfile:
yaml.dump(config, outfile, default_flow_style=False)
return True
[docs]def load_geo_data(path, header=None):
"""
Load geo data for individual zones.
:param path: absolute path to the geo data file
:param header: load header
"""
data = pd.read_csv(path, index_col=4, header=header, keep_default_na=False)
return data