Workflow to build Sklearn pipelines
Practical example of how to build scikit-learn pipelines and custom transformers
What is a Sklearn Pipeline?
The Sklearn pipeline is used to organize several steps of data preprocessing and modeling into a function, is a tool to improve readability, make it easier to accomplish different experiments, and make everything done more reproducible.
Pipelines have two things. First, transformers that are used to transform a dataset and implement transform() and fit() methods. Second, an estimator that learns from the data and only needs to implement a fit() method.
Hands on the problem
We are going to use the TMDB 5000 Movie dataset. The main objective is to learn how to build a pipeline with scikit-learn besides creating custom-transformers and understand the workflow to apply pipelines for other problems. In the article, all those concepts of custom transformers and best practices for building a pipeline will be covered.
First of all, we need to import the data from Kaggle, in the read_csv function, setting a few parameters to get rid of some NaN values we could find in the dataset.
movies = pd.read_csv("/content/drive/MyDrive/Datasets/tmdb_5000_movies.csv", na_filter=True, na_values='[]')
credits = pd.read_csv("/content/drive/MyDrive/Datasets/tmdb_5000_credits.csv")
If we take a look in the dataset, it is perceptive that there are some features in JSON, numeric and string format, which will need preprocessing functions to be suitable for the model later on. All those preprocessing steps to work in a pipeline need to be in a few ready and custom transformers.
movies.head(2)
We can have an idea of which features to use, through Pandas correlation function, as it is possible to see below, the columns “vote_count” and “budget” have strong correlation with revenue.
corr_matrix = movies.corr()
corr_matrix["revenue"].sort_values(ascending=False)
The next step is to decide which features to use for a better prediction in revenue. Based on the results of correlation and assumptions, those columns below will be used.
num_columns_to_use = ['budget', 'popularity', 'vote_average', 'vote_count']
str_columns_to_use = ['genres', 'production_companies']
Drop NaN values for string columns
movies = movies.dropna(subset=['production_companies', 'genres'])
movies.reset_index(drop=True, inplace=True)
Next we will create a function to get the values with the most appearances in ‘production_companies’ and ‘genres’ columns. The function will run in the entire dataset, then return the values to an array to be used later on.
from ast import literal_eval
def values_with_most_appearences(X, columns):
values = []
X_copy = X.copy()
for column in columns:
X_copy[column] = X_copy[column].apply(literal_eval)
for row in X.index:
names = [i['name'] for i in X_copy[column][row]]
X_copy[column][row] = names[:2] if len(names) >= 2 else names
temp_df = pd.DataFrame(X_copy[column].to_list(), columns=['feature_1', 'feature_2'])
if column == "production_companies":
new_features_df = pd.Series(np.concatenate([temp_df["feature_1"], temp_df["feature_2"]])).value_counts()[:150].index
else:
feature_1 = temp_df["feature_1"].unique()
feature_2 = temp_df["feature_2"].unique()
new_features_df = np.concatenate([feature_1, feature_2])
new_features_df = list(dict.fromkeys(new_features_df))
values.append(new_features_df)
return values
new_df = values_with_most_appearences(movies, ['genres', 'production_companies'])
print(new_df[0][:3])
print(new_df[1][:3])
Next, we split our data into a train and test set, resetting rows of dataframe as well to prevent future errors.
from sklearn.model_selection import train_test_split
X = movies.drop('revenue', axis=1)
y = movies['revenue']
X_train, X_test, y_train, y_test = train_test_split(movies, y, test_size=0.2, random_state=42)
X_train.reset_index(drop=True, inplace=True)
X_test.reset_index(drop=True, inplace=True)
y_train.reset_index(drop=True, inplace=True)
y_test.reset_index(drop=True, inplace=True)
Building custom transformers
Every transformer is a class, with at least one fit() and transform() method. To be part of a Pipeline in Scikit-learn, one also needs to inherit BaseEstimator and TransformerMixin from the sklearn.base module. The TransformerMixin class runs fit() and transformer() one after another in one call withfit_transform(). The BaseEstimator gets the methods get_params() and set_params() methods that can be useful.
The first custom transformer that we need to build must receive a dataframe without any columns selected or preprocessing steps and receive an array with the columns we want.
Note: It is good practice for preventing some errors to declare the variable in __init__ with the same name that was placed in the parameters.
from sklearn.base import BaseEstimator, TransformerMixin
class ColumnsSelector(BaseEstimator, TransformerMixin):
def __init__(self, columns):
self.columns = columns
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
return X[self.columns]
We have two columns, ‘genres’ and ‘production_companies’, that are in JSON format, with X values for each row. The idea is to get the first two values from each feature, and put it in a list, substituting the whole JSON that was there before.
from ast import literal_eval
class FormatToArrayOfXValues(BaseEstimator, TransformerMixin):
def __init__(self, columns):
self.columns = columns
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
X_copy = X.copy()
for column in self.columns:
X_copy[column] = X_copy[column].apply(literal_eval)
for row in X_copy.index:
names = [i['name'] for i in X_copy[column][row]]
X_copy[column][row] = names[:2] if len(names) >= 2 else names
return X_copy
The last custom transformer for our string pipeline will create a column for each value we got before in our values_with_most_appearences() function, then verify if there is a column for the values we put on a list before (values from ‘genres’ and ‘production_companies’ columns), if exists, assign 1, else 0.
class ValuesToColumns(BaseEstimator, TransformerMixin):
def __init__(self, columns, new_features):
self.columns = columns
self.new_features = new_features
def fit(self, X, y=None):
return self
def transform(self, X, y=None):
X_copy = X.copy()
X_copy.reset_index(drop=True, inplace=True)
for i in range(2):
X_copy = pd.concat([X_copy, pd.DataFrame(columns=self.new_features[i])])
X_copy.fillna(int(0), inplace=True)
for column in self.columns:
for row in range(len(X_copy[column])):
for value in X_copy[column][row]:
if value != None and value in X_copy.columns:
X_copy.loc[row, value] = 1
X_copy.drop(columns=self.columns, inplace=True)
return X_copy
All the custom transformers needed for data processing have been created. Now we can build our first pipeline, bringing all the Transformers to one place.
from sklearn.pipeline import Pipeline
str_pipe = Pipeline(steps=[
("ColumnsSelector", ColumnsSelector(str_columns_to_use)),
("Format", FormatToArrayOfXValues(str_columns_to_use)),
("ValuesToColumns", ValuesToColumns(str_columns_to_use, new_df))
])
Next, we still need to take care of some numeric features we decided to use. The custom transformer, ColumnsSelector, can be used again to select all the numeric columns we want, plus a Transformer from sklearn.preprocessing module, to normalize our data.
from sklearn.preprocessing import MinMaxScaler
num_pipe = Pipeline(steps=[
("ColumnsSelector", ColumnsSelector(num_columns_to_use)),
("Normalize", MinMaxScaler())
])
To join the two pipelines into the same function, we can use ColumnTransformer to perform each preprocessing step into the columns we want and gather them all together.
from sklearn.compose import ColumnTransformer
full_processor = ColumnTransformer(transformers=[
('number', num_pipe, num_columns_to_use),
('category', str_pipe, str_columns_to_use)
])
To run the pipeline, it is simple, just call fit_transform() and we can see our data processed.
example = full_processor.fit_transform(X_train)
It is possible to add a model to our pipeline.
from sklearn.ensemble import RandomForestRegressor
full_pipeline = Pipeline(steps=[
('preprocess', full_processor),
('model', RandomForestRegressor())
])
Hyperparameter tuning is possible also, we can use RandomizedSearchCV or GridSearchCV.
Note: Take care when dealing with indexes in a dataframe and running RandomizedSearch or GridSearch, it can return a lot of errors. That is why we are using some ‘reset_index’ throughout our Transformers.
from sklearn.model_selection import RandomizedSearchCV
from sklearn.model_selection import GridSearchCV
search = RandomizedSearchCV(full_pipeline, params, scoring="neg_mean_squared_error", cv=5, error_score='raise')
_ = search.fit(X_train, y_train)
print('Score:', abs(search.best_score_))
We were able to perform all the necessary process to learn how to use Scikit-learn Pipelines and Transformers.
Thank you so much for reading, and if you enjoyed it and are interested in reading other articles, follow me here on Medium.
If you want to know more about my projects, articles or want to contact me, you can connect with me on LinkedIn or follow my GitHub.