Data Preprocess
Preprocessor():¶
This class shall be used to include all Data Pre-processing techniques to be fed to the Machine Learning Models
get_data_profile¶
def get_data_profile(self, data):
self.data_profile = {}
self.missing_values = {}
self.missing_val_pct = {}
self.data_profile['rows'] = data.shape[0]
self.data_profile['columns'] = data.shape[1]
self.missing_vals = data.isna().sum()
for col in data.columns:
if data[col].isnull().sum() > 0:
self.missing_values[col] = data[col].isnull().sum()
self.missing_val_pct[col] = (data[col].isnull().sum() / len(data)) * 100
self.data_profile['missing_values'] = self.missing_values
self.data_profile['missing_vals_pct'] = self.missing_val_pct
self.data_profile['categorical_columns'] = list(data.select_dtypes(exclude=["int64", "float"]))
self.data_profile['num_categorical_columns'] = len(self.data_profile['categorical_columns'])
self.data_profile['numerical_columns'] = list(data.select_dtypes(exclude=["object"]))
self.data_profile['num_numerical_columns'] = len(self.data_profile['numerical_columns'])
self.data_profile['num_duplicate_rows'] = data.duplicated().sum()
self.describe = data.describe().T
self.standard_deviation = self.describe[self.describe['std'] == 0]
self.standard_deviation = list(self.standard_deviation.index)
self.data_profile['num_col_with_std_zero'] = len(self.standard_deviation)
if len(self.standard_deviation) > 0:
self.data_profile['cols_with_std_zero'] = self.standard_deviation
self.size = data.size / (1024 * 1024)
self.data_profile['datasize'] = str(round(self.size, 2)) + " MB"
return self.data_profile
separatete Label features¶
This is used to separate the target and feature as X and Y
def separate_label_feature(self, data, label_column_name):
self.X = data.drop(labels=label_column_name,axis=1) # drop the columns specified and separate the feature columns
self.Y = data[label_column_name] # Filter the Label columns
return self.X, self.Y
remove_columns¶
This method removes the given columns from a pandas dataframe.
def remove_columns(self, data, columns):
self.data = data
self.columns = columns
self.useful_data = self.data.drop(labels=self.columns, axis=1) # drop the labels specified in the columns
return self.useful_data
impute_missing_values¶
This method will be used to impute missing values in the dataframe
def impute_missing_values(self, data, mv_flag=None, target=None, strategy='mode', impute_val=None,missing_vals=None):
dataframe = data
if mv_flag is True:
# Converting missing_vals to Nan Values
if missing_vals:
dataframe.replace(missing_vals, np.nan, inplace=True)
# Checking for Missing Values in Dependent Variable
if dataframe[target].isna().any():
dataframe = dataframe[dataframe[target].notna()].copy() # Selecting the Dataframe With No missing values in Dependent column
# Checking for Missing Values in Independent Variables
Missing_data_columns = dataframe.columns[
dataframe.isna().any()].tolist() # Finding Columns with the missing data from dataframe
if strategy == 'fixed': # checking if strategy == fixed
dataframe.fillna(impute_val,inplace=True) # Filling the Nan values with the imputed value from user
else:
for columns in Missing_data_columns: # Iterating over the columns having Nan Values
if dataframe[columns].dtype == 'object': # Checking for the categorical data
mode = dataframe[columns].mode()[0]
dataframe[columns].fillna(mode,inplace=True) # Imputing Nan values with mode of the column
else:
if strategy == 'median': # checking if the strategy == median
median = dataframe[columns].median()
dataframe[columns].fillna(median,inplace=True) # Imputing Nan values with median of the column
else: # The only strategy remains is mean
mean = dataframe[columns].mean()
dataframe[columns].fillna(mean,inplace=True) # Imputing Nan values with mean of the column
else:
self.logger_object.log(self.file_object, "my_flag found False")
return dataframe
type_conversion¶
This method will be used to convert column datatype from numerical to categorical or vice-versa, if possible.
def type_conversion(self, dataset, cat_to_num=None, num_to_cat=None):
if cat_to_num is not None:
for column in cat_to_num:
dataset[column] = pd.to_numeric(dataset[column])
if num_to_cat is not None:
for column in num_to_cat:
dataset[column] = dataset[column].astype('object')
return dataset
remove_imbalance¶
- Used to handle unbalanced datasets(rare classes) through oversampling/ undersampling techniques
- Input Description: data: the input dataframe with target column. threshold: the threshold of mismatch between the target values to perform balancing.
- Output: A balanced dataframe.
def remove_imbalance(self, data, target, threshold=10.0, oversample=True, smote=False):
X = data.drop(target, axis=1)
y = data[target]
self.logger_object.log(self.file_object,
'Class Imbalance Process Starts in the remove_imbalance method of the DataPreprocessor class')
no_of_classes = data[target].nunique()
if no_of_classes == 2:
thresh_satisfied = ((data[target].value_counts() / float(len(data[target])) * 100).any() < threshold)
if thresh_satisfied:
if smote:
smote = SMOTE()
X, y = smote.fit_resample(X, y)
elif oversample:
ROS = RandomOverSampler(sampling_strategy='auto', random_state=42)
X, y = ROS.fit_sample(X, y)
else:
ROS = RandomUnderSampler(sampling_strategy='auto', random_state=42)
X, y = ROS.fit_sample(X, y)
else:
high = (data[target].value_counts() / float(len(data[target])) * 100).ravel().max()
low = (data[target].value_counts() / float(len(data[target])) * 100).ravel().min()
thresh_satisfied = (high - low > 100.0 - threshold)
if thresh_satisfied:
if smote:
for i in range(no_of_classes - 2):
smote = SMOTE()
X, y = smote.fit_resample(X, y)
elif oversample:
for i in range(no_of_classes - 2):
ROS = RandomOverSampler(sampling_strategy='auto', random_state=42)
X, y = ROS.fit_sample(X, y)
else:
for i in range(no_of_classes - 2):
ROS = RandomUnderSampler(sampling_strategy='auto', random_state=42)
X, y = ROS.fit_sample(X, y)
y.to_frame(name=target)
dfBalanced = pd.concat([X, y], axis=1)
return dfBalanced
remove_columns_with_minimal_variance¶
This method drops any numerical column with standard deviation below specified threshold Input : data: input DataFrame in which we need to check std deviations : threshold : the threshold for std deviation below which we need to drop the columns Output: A DataFrame with numerical columns with low std dev dropped.
def remove_columns_with_minimal_variance(self, data, threshold):
sel = VarianceThreshold(threshold=(threshold * (1 - threshold)))
columnlist = list(data.select_dtypes(include='number').columns)
sel_var = sel.fit_transform(data[columnlist])
new_data = data[data.columns[sel.get_support(indices=True)]]
return new_data # return the read data to the calling method
standardize_data¶
This method will be used to standardize al the numeric variables. Where mean = 0, std dev = 1. * Input : the input dataframe with numeric columns. * Output: Standardized data where mean of each column will be 0 and standard deviation will be 1.
def standardize_data(self, dataframe):
data = dataframe
stdscalar = StandardScaler()
scaled_data = stdscalar.fit_transform(data)
scaled_data = pd.Dataframe(data=scaled_data, columns=data.columns)
return scaled_data
normalize_data¶
This method will be used to normalize all the numeric variables. Where min value = 0 and max value = 1. * Input : the input dataframe with numeric columns. * Output: Normalized data where minimum value of each column will be 0 and maximum value of each column will be 1.
def normalize_data(self, dataframe):
data = dataframe
normalizer = MinMaxScaler()
normalized_data = normalizer.fit_transform(data)
normalized_data = pd.Dataframe(data=normalized_data, columns=data.columns)
return normalized_data
pca¶
This method reduces the dimension from scaled Data which enables quick for large data files. * input : Data which is Scaled, var_explained = 0.90(default value) * Output : It returns the scaled and reduced dimensions.
def pca(self, data, var_explained):
self.data = data
self.var_explained = var_explained
n = len(data.keys()) # find out the no columns in the data
mat_pca = PCA(n_components=n)
mat_pca.fit(data) # applying PCA model
##calculate variance ratios
variance = mat_pca.explained_variance_ratio_
cum_var = np.cumsum(np.round(mat_pca.explained_variance_ratio_, decimals=3) * 100)
calc_num_components¶
This function is used for calculating number of principal components to use:
def calc_num_components(cum_var, var_explained):
for i in range(n):
if cum_var[i] >= var_explained:
return i + 1
# call the function to calulate num_components:
n_components = calc_num_components(cum_var, var_explained)
# create the PCA instance
pca = PCA(n_components=n_components)
principal_components = pca.fit_transform(data)
# Convert into dataframe
pca_data = pd.DataFrame(data=principal_components,
columns=['PC' + str(i) for i in range(1, n_components + 1)])
return pca_data
preprocess¶
This function is used to preprocess the data by calling the apis listed above * input: dataset, target columns, unwanted columns * output: preprocessed target and feature dataset
def preprocess(self, dataset, target_column,unwanted_cols):
dataset = self.remove_columns(dataset, columns=['Unnamed: 0'])
dataset = self.impute_missing_values(data=dataset, mv_flag=True, target=target_column)
# dataset = self.remove_columns(dataset, unwanted_cols)
dataset = self.remove_imbalance(dataset, target_column, threshold=10.0, oversample=True, smote=False)
dataset, self.y = self.separate_label_feature(dataset, target_column)
self.x = self.remove_columns_with_minimal_variance(data=dataset, threshold=0.1)
return self.x, self.y