Source code for oemof.tabular.datapackage.aggregation

# -*- coding: utf-8 -*-
"""
Module used for aggregation sequences and elements.

"""
import os
import re

from datapackage import Package, Resource
import pandas as pd

try:
    import tsam.timeseriesaggregation as tsam
except ImportError:
    raise ImportError("Need to install tsam to use aggregation!")

from .building import write_sequences
from .processing import copy_datapackage


[docs]def temporal_skip(datapackage, n, path="/tmp", name=None, *args): """ Creates a new datapackage by aggregating sequences inside the `sequence` folder of the specified datapackage by skipping `n` timesteps Parameters ---------- datapackage: string String of meta data file datapackage.json n: integer Number of timesteps to skip path: string Path to directory where the aggregated datapackage is stored name: string Name of the new, aggregated datapackage. If not specified a name will be given """ p = Package(datapackage) cwd = os.getcwd() if name is None: copied_package_name = ( p.descriptor["name"] + "__temporal_skip__" + str(n) ) else: copied_package_name = name copy_path = os.path.join(path, copied_package_name) copied_root = copy_datapackage( datapackage, os.path.abspath(copy_path), subset="data" ) sequence_resources = [ r for r in p.resources if re.match(r"^data/sequences/.*$", r.descriptor["path"]) ] dfs = { r.name: pd.DataFrame(r.read(keyed="True")) .set_index("timeindex") .astype(float) for r in sequence_resources } sequences = pd.concat(dfs.values(), axis=1) skip_sequences = sequences.loc[::n] temporal = pd.Series(data=n, index=skip_sequences.index, name="weighting") temporal.index.name = "timeindex" os.chdir(copied_root) for r in sequence_resources: write_sequences( r.name + ".csv", dfs[r.name].loc[temporal.index], replace=True ) # write temporal information from clustering temporal.to_csv( "data/temporal.csv", header=True, sep=";", date_format="%Y-%m-%dT%H:%M:%SZ", ) # add meta data for new temporal information r = Resource({"path": "data/temporal.csv"}) r.infer() r.descriptor[ "description" ] = "Temporal selection based on skipped timesteps. Skipped n={}".format(n) # Update meta-data of copied package cp = Package("datapackage.json") cp.descriptor["name"] = copied_package_name cp.descriptor["resources"].append(r.descriptor) cp.commit() cp.save("datapackage.json") # set back to 'old' workdirectory os.chdir(cwd) return copied_root
[docs]def temporal_clustering(datapackage, n, path="/tmp", how="daily"): """ Creates a new datapackage by aggregating sequences inside the `sequence` folder of the specified datapackage by clustering `n` timesteps Parameters ---------- datapackage: string String of meta data file datapackage.json n: integer Number of clusters path: string Path to directory where the aggregated datapackage is stored how: string How to cluster 'daily' or 'hourly' """ if how == "weekly": raise NotImplementedError("Weekly clustering is not implemented!") p = Package(datapackage) cwd = os.getcwd() copied_package_name = ( p.descriptor["name"] + "__temporal_cluster__" + how + "_" + str(n) ) copy_path = os.path.join(path, p.descriptor["name"], copied_package_name) copied_root = copy_datapackage( datapackage, os.path.abspath(copy_path), subset="data" ) sequence_resources = [ r for r in p.resources if re.match(r"^data/sequences/.*$", r.descriptor["path"]) ] dfs = { r.name: pd.DataFrame(r.read(keyed="True")) .set_index("timeindex") .astype(float) for r in sequence_resources } sequences = pd.concat(dfs.values(), axis=1) if how == "daily": hoursPerPeriod = 24 elif how == "hourly": hoursPerPeriod = 1 elif how == "weekly": hoursPerPeriod = 24 * 7 aggregation = tsam.TimeSeriesAggregation( sequences, noTypicalPeriods=n, rescaleClusterPeriods=False, hoursPerPeriod=hoursPerPeriod, clusterMethod="hierarchical", ) cluster_weights = { aggregation.clusterCenterIndices[n]: w for n, w in aggregation.clusterPeriodNoOccur.items() } if how == "daily": temporal = pd.Series( { d: cluster_weights[d.dayofyear] for d in sequences.index if d.dayofyear in aggregation.clusterCenterIndices }, name="weighting", ) temporal.index.name = "timeindex" elif how == "hourly": temporal = pd.Series( { h: cluster_weights[sequences.index.get_loc(h)] for h in sequences.index if sequences.index.get_loc(h) in aggregation.clusterCenterIndices }, name="weighting", ) temporal.index.name = "timeindex" # write resources to copied package (should not interfer with meta data) # as columns are not removed and sorted when written. os.chdir(copied_root) for r in sequence_resources: write_sequences( r.name + ".csv", dfs[r.name].loc[temporal.index], replace=True ) # write temporal information from clustering temporal.to_csv( "data/temporal.csv", header=True, sep=";", date_format="%Y-%m-%dT%H:%M:%SZ", ) # add meta data for new temporal information r = Resource({"path": "data/temporal.csv"}) r.infer() # TODO: Add meta-data description r.descriptor[ "description" ] = "Temporal selection based on hierachical clustering..." # Update meta-data of copied package cp = Package("datapackage.json") cp.descriptor["name"] = copied_package_name cp.descriptor["resources"].append(r.descriptor) cp.commit() cp.save("datapackage.json") # set back to 'old' workdirectory os.chdir(cwd) return copied_root