Source code for Nyoka's Pre-Processing Module

import queue
import numpy as np
from sklearn.utils import check_array
from sklearn.base import TransformerMixin
FLOAT_DTYPES = (np.float64, np.float32, np.float16)


class Lag(TransformerMixin): """ The Lag class takes `value` number of previous record of the fields where it is applied and applies `aggregation` to those values. Parameters ---------- aggregation : String aggregation type. The valid types are ["min", "max", "sum", "avg", "median", "product", "stddev"] value : Integer (default = 1) The number of previous record to aggregate """ _VALID_AGGS = ["min", "max", "sum", "avg", "median", "product", "stddev"] _AGG_FUNC_MAP = { "min" : np.min, "max" : np.max, "sum" : np.sum, "avg" : np.mean, "median" : np.median, "product" : np.product, "stddev" : np.std } def __init__(self, aggregation, value=1, copy=True): assert aggregation in self._VALID_AGGS, f"Invalid `aggregation` type. Valid types are {self._VALID_AGGS}" self.aggregation = aggregation self.value = value self.copy = copy
def fit(self, X, y=None): """ Does nothing. Returns ------- The same object """ return self
def transform(self, X, y=None): """ Trasforms the given X by taking `value` number of previous records and applying `aggregation` method Parameters ---------- X : Pandas DataFrame or numpy array The input data y : It is ignored. Returns ------- Transformed X as numpy array """ self._transformed_X = list() X = check_array(X, copy=self.copy, warn_on_dtype=True, estimator=self) q_list = [queue.Queue() for i in range(len(X[0]))] for _ in range(self.value): for q_ in q_list: q_.put(0.0) for row in X: aggregated_vals = [self._AGG_FUNC_MAP[self.aggregation](q_.queue) for q_ in q_list] self._transformed_X.append(aggregated_vals) for idx, col in enumerate(row): q_list[idx].put(col) q_list[idx].get() return np.array(self._transformed_X)
def __repr__(self): return f"Lag(aggregation='{self.aggregation}', value={self.value})"