Source code for Statsmodels Exporter

"""
 Copyright (c) 2004-2016 Zementis, Inc.
 Copyright (c) 2016-2020 Software AG, Darmstadt, Germany and/or Software AG USA Inc., Reston, VA, USA, and/or its

 SPDX-License-Identifier: Apache-2.0

   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
 """
from __future__ import absolute_import

import sys, os
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
sys.path.append(BASE_DIR)


from pprint import pprint
from PMML44 import *
from datetime import datetime
import metadata
import warnings
import math
from enums import *

[docs]class StatsmodelsToPmml: """ Exports time-series models from statsmodels library into PMML. Parameters: ----------- results_obj: Instance of AR(I)MAResultsWrapper / (SARI/VAR)MAXResultsWrapper from statsmodels pmml_file_name: string Name of the PMML conf_int : list (optional) Confidence intervel. A list of values mentioning the percentage of confidence. e.g., conf_int = [80,95] will create OutputField for lower bound and upper bound of confidence interval with 80% and 95%. model_name : string (optional) Name of the model description : string (optional) Description of the model Returns ------- Generates PMML object and exports it to `pmml_file_name` """ def __init__(self, results_obj=None, pmml_file_name="from_statsmodels.pmml", conf_int=None, model_name=None, description=None): self.results_obj = results_obj self.pmml_file_name = pmml_file_name self.conf_int = conf_int self.model_name = model_name self.description = description self.pmml = None self.construct_pmml() self.export_pmml()
[docs] def export_pmml(self): """ Writes the generated PMML object to given `pmml_file_name` """ pmml = PMML( version=PMML_SCHEMA.VERSION.value, Header=Header( copyright = "Copyright (c) 2018 Software AG", description = self.description, Timestamp = Timestamp(datetime.now()), Application=Application(name="Nyoka",version=metadata.__version__) ), DataDictionary=self.data_dictionary, TimeSeriesModel=[self.ts_model] ) pmml.export(open(self.pmml_file_name,'w'),0)
[docs] def generate_data_dictionary(self): """ Generates DataDictionary Object. The number of DataField is one more than the dimension of the data.\ The extra DataField is a supplementary to hold the value of `h`(horizon) for forecasting. """ data_fields = [] for val in self.y: data_fields.append( DataField( name=val, optype=OPTYPE.CONTINUOUS.value, dataType=DATATYPE.DOUBLE.value ) ) data_fields.append( DataField(name="h", optype=OPTYPE.CONTINUOUS.value, dataType=DATATYPE.INTEGER.value) ) self.data_dictionary = DataDictionary( numberOfFields=len(data_fields), DataField=data_fields )
[docs] def construct_pmml(self): """ Constructs the actual model object. (ARIMA/ TimeSeriesModel) """ if 'int' in str(self.results_obj.model.endog.dtype): self.results_obj.model.endog=self.results_obj.model.endog.astype('float64') self.results_obj.model.data.endog=self.results_obj.model.data.endog.astype('float64') self.data_obj = self.results_obj.data self.model = self.results_obj.model self.y = self.results_obj.data.ynames if self.y.__class__.__name__ == "str": self.y = [self.y.split(".")[-1]] self.generate_data_dictionary() output = self.generate_output() mining_schema = self.generate_mining_schema() time_series_list = self.generate_time_series() arima_model = None state_space_model = None if self.model.__class__.__name__ in ['ARMA', 'ARIMA']: self.model_name = self.model_name if self.model_name else "ArimaModel" self.description = self.description if self.description else "Non-Seasonal Arima Model" if hasattr(self.results_obj,"fit_details"): best_fit = TIMESERIES_ALGORITHM.STATE_SPACE_MODEL.value state_space_model = self.generate_state_space_model() else: best_fit = TIMESERIES_ALGORITHM.ARIMA.value arima_model = self.generate_arima_model() elif self.model.__class__.__name__ in ['VARMAX','SARIMAX']: best_fit = TIMESERIES_ALGORITHM.STATE_SPACE_MODEL.value self.model_name = self.model_name if self.model_name else self.model.__class__.__name__ self.description = self.description if self.description else "State Space Model" state_space_model = self.generate_state_space_model() else: raise NotImplementedError("Not Implemented. Currently we support only ARMA, ARIMA, SARIMAX and VARMAX.") self.ts_model = TimeSeriesModel( modelName=self.model_name, functionName=MINING_FUNCTION.TIMESERIES.value, bestFit=best_fit, MiningSchema=mining_schema, Output=output, TimeSeries=time_series_list, ARIMA=arima_model, StateSpaceModel=state_space_model )
[docs] def generate_state_space_model(self): """ Constructs StateSpaceModel object. For the following models -\ - `statsmodels.tsa.statespace.sarimax.SARIMAX` - `statsmodels.tsa.statespace.varmax.VARMAX` - `statsmodels.tsa.statespace.tsa.arima.ARIMA` """ import numpy as np np.set_printoptions(precision=12) selected_state_cov_matrix = None predicted_state_cov_matrix = None observation_cov_matrix = None smoother_results = self.results_obj.smoother_results S_t0 = self.results_obj.filtered_state[...,-1] F_matrix = smoother_results.transition[...,-1] #transition_matrix G = smoother_results.design[...,-1] #measurement_matrix if self.model.__class__.__name__ in ["ARMA","ARIMA"]: intercept = [smoother_results.obs_intercept[...,-1][0]] intercept_type = "observation" S_t1 = np.dot(F_matrix, S_t0) else: #state_intercept might contain `nan` values. So here it is generated manually. intercept = np.zeros(S_t0.shape) if self.model.k_trend: if self.model.__class__.__name__ == 'VARMAX': mu = self.results_obj.params[self.model._params_trend] if mu.__class__.__name__ == 'Series': mu = mu.values intercept[:len(mu)] += mu else: mu=self.results_obj._params_trend[0] if mu.__class__.__name__ == 'Series': mu = mu.values spec = self.results_obj.specification k_state = spec['k_diff']+spec['seasonal_periods']*spec['k_seasonal_diff'] intercept[k_state] += mu intercept_type = "state" S_t1 = np.dot(F_matrix, S_t0) + intercept arr_content = " ".join([str(val) for val in intercept]) arr = ArrayType(type_=ARRAY_TYPE.REAL.value,content=arr_content, n=len(intercept)) intercept_vector = InterceptVector(Array=arr, type_=intercept_type) t_mat = Matrix(nbRows=F_matrix.shape[0], nbCols=F_matrix.shape[1]) for row in F_matrix: array_content = " ".join([str(val) for val in row]) t_mat.add_Array(ArrayType(content=array_content, type_=ARRAY_TYPE.REAL.value)) transition_matrix = TransitionMatrix(Matrix=t_mat) m_mat = Matrix(nbRows=G.shape[0], nbCols=G.shape[1]) for row in G: array_content = " ".join([str(val) for val in row]) m_mat.add_Array(ArrayType(content=array_content, type_=ARRAY_TYPE.REAL.value)) measurement_matrix = MeasurementMatrix(Matrix=m_mat) arr_content = " ".join([str(val) for val in S_t1]) arr = ArrayType(type_=ARRAY_TYPE.REAL.value,content=arr_content, n=len(S_t1)) final_state_vector = FinalStateVector(Array=arr) #For confidence interval if self.conf_int is not None: R = smoother_results.selection[...,-1] # selection matrix Q = smoother_results.state_cov[...,-1] # state_covariance matrix obs_cov = smoother_results.obs_cov[..., -1] # observation covariance R_Q_R_prime = np.dot(R,np.dot(Q,R.T)) # selected_state_cov P_t0 = smoother_results.predicted_state_cov[...,-1] # predicted_state_cov RQR_mat = Matrix(nbRows=R_Q_R_prime.shape[0], nbCols=R_Q_R_prime.shape[1]) for row in R_Q_R_prime: array_content = " ".join([str(val) for val in row]) RQR_mat.add_Array(ArrayType(content=array_content, type_=ARRAY_TYPE.REAL.value)) selected_state_cov_matrix = SelectedStateCovarianceMatrix(Matrix=RQR_mat) p_mat = Matrix(nbRows=P_t0.shape[0], nbCols=P_t0.shape[1]) for row in P_t0: array_content = " ".join([str(val) for val in row]) p_mat.add_Array(ArrayType(content=array_content, type_=ARRAY_TYPE.REAL.value)) predicted_state_cov_matrix = PredictedStateCovarianceMatrix(Matrix=p_mat) h_mat = Matrix(nbRows=obs_cov.shape[0], nbCols=obs_cov.shape[1]) for row in obs_cov: array_content = " ".join([str(val) for val in row]) h_mat.add_Array(ArrayType(content=array_content, type_=ARRAY_TYPE.REAL.value)) observation_cov_matrix = ObservationVarianceMatrix(Matrix=h_mat) state_space_model = StateSpaceModel( StateVector=final_state_vector, TransitionMatrix=transition_matrix, MeasurementMatrix=measurement_matrix, InterceptVector=intercept_vector, SelectedStateCovarianceMatrix=selected_state_cov_matrix, PredictedStateCovarianceMatrix=predicted_state_cov_matrix, ObservationVarianceMatrix=observation_cov_matrix ) return state_space_model
[docs] def generate_arima_model(self): """ Constructs ARIMA object. Only for `statsmodels.tsa.arima_model.ARIMA` class. """ p = self.results_obj.k_ar q = self.results_obj.k_ma d = getattr(self.results_obj,'k_diff',0) ar = None ma = None if p > 0: ar_content = ' '.join([str(i) for i in self.results_obj.arparams]) ar_params_array = ArrayType(content = ar_content, n = p, type_ = ARRAY_TYPE.REAL.value) ar = AR(Array = ar_params_array) if q > 0: ma_content = ' '.join([str(coeff) for coeff in self.results_obj.maparams]) ma_coeff_array = ArrayType(content = ma_content, n = q, type_ = ARRAY_TYPE.REAL.value) ny_maCoef_obj = MACoefficients(Array = ma_coeff_array) residuals = self.results_obj.resid[-q:] resid_content = ' '.join([str(res) for res in residuals]) resid_array = ArrayType(content = resid_content, n = q, type_ = ARRAY_TYPE.REAL.value) residual_obj = Residuals(Array = resid_array) ma = MA(MACoefficients = ny_maCoef_obj, Residuals = residual_obj) const_term = 0 if self.results_obj.k_trend: const_term = self.results_obj.params[0] non_seasonal_comp = NonseasonalComponent(p = p, d = d, q = q, AR = ar, MA = ma) rmse = math.sqrt(self.model.sigma2) arima_obj = ARIMA(constantTerm = const_term, predictionMethod = ARIMA_PREDICTION_METHOD.CSS.value, RMSE=rmse, NonseasonalComponent = non_seasonal_comp ) return arima_obj
[docs] def generate_time_value_object(self, data): """ Generates TimeValue object. If data has any index, then the index will be in TimeStamp object. """ time_values = [] indices = self.data_obj.dates for data_idx in range(len(data)): tv = TimeValue(index=data_idx,value=data[data_idx],\ Timestamp=Timestamp(str(indices[data_idx])) if indices is not None else None) time_values.append(tv) return time_values
[docs] def generate_time_series(self): """ Generates TimeSeries object. The number of TimeSeries object is equal to the dimeansion of the data. """ time_series_list = [] if self.data_obj.endog.ndim == 1: ts = TimeSeries(usage = TIMESERIES_USAGE.ORIGINAL.value, field=self.y[0], startTime = 0,\ endTime = len(self.data_obj.endog) - 1,\ TimeValue = self.generate_time_value_object(self.data_obj.endog)) time_series_list.append(ts) else: for i in range(self.data_obj.endog.shape[-1]): ts = TimeSeries(usage = TIMESERIES_USAGE.ORIGINAL.value, field=self.y[i], startTime = 0,\ endTime = len(self.data_obj.endog) - 1,\ TimeValue = self.generate_time_value_object(self.data_obj.endog[...,i])) time_series_list.append(ts) return time_series_list
[docs] def generate_output(self): """ Generates Output object. If user provides value in `conf_int` parameter, then there will be two OuputField\ for each value. One with `feature=confidenceIntervalLower` and another with `feature=confidenceIntervalUpper`. """ out_flds = [] for y_ in self.y: out_flds.append( OutputField( name="predicted_"+y_, optype=OPTYPE.CONTINUOUS.value, dataType=DATATYPE.STRING.value, feature=RESULT_FEATURE.PREDICTED_VALUE.value, Extension=[Extension(extender="ADAPA",name="dataType",value="json")] ) ) if self.conf_int is not None: lower = [] upper = [] for percent in self.conf_int: for y_ in self.y: lower.append( OutputField( name=f"conf_int_{percent}_lower_{y_}", optype=OPTYPE.CONTINUOUS.value, dataType=DATATYPE.STRING.value, targetField=y_, feature=RESULT_FEATURE.CONFIDENCE_INTERVAL_LOWER.value, value=percent, Extension=[Extension(extender="ADAPA",name="dataType",value="json")] ) ) upper.append( OutputField( name=f"conf_int_{percent}_upper_{y_}", optype=OPTYPE.CONTINUOUS.value, dataType=DATATYPE.STRING.value, targetField=y_, feature=RESULT_FEATURE.CONFIDENCE_INTERVAL_UPPER.value, value=percent, Extension=[Extension(extender="ADAPA",name="dataType",value="json")] ) ) out_flds.extend(lower + upper) return Output(OutputField=out_flds)
[docs] def generate_mining_schema(self): """ Generates MiningSchema object. """ mining_fields = [] for y_ in self.y: mining_fields.append(MiningField(name = y_, usageType = FIELD_USAGE_TYPE.TARGET.value)) mining_fields.append(MiningField(name = 'h', usageType = FIELD_USAGE_TYPE.SUPPLEMENTARY.value)) return MiningSchema(MiningField=mining_fields)