Workflow to build Sklearn pipelines

João Pedro dos Santos
6 min readJan 3, 2023

Practical example of how to build scikit-learn pipelines and custom transformers

Photo by realaxer on Unsplash

What is a Sklearn Pipeline?

The Sklearn pipeline is used to organize several steps of data preprocessing and modeling into a function, is a tool to improve readability, make it easier to accomplish different experiments, and make everything done more reproducible.

Pipelines have two things. First, transformers that are used to transform a dataset and implement transform() and fit() methods. Second, an estimator that learns from the data and only needs to implement a fit() method.

Hands on the problem

We are going to use the TMDB 5000 Movie dataset. The main objective is to learn how to build a pipeline with scikit-learn besides creating custom-transformers and understand the workflow to apply pipelines for other problems. In the article, all those concepts of custom transformers and best practices for building a pipeline will be covered.

First of all, we need to import the data from Kaggle, in the read_csv function, setting a few parameters to get rid of some NaN values we could find in the dataset.

movies = pd.read_csv("/content/drive/MyDrive/Datasets/tmdb_5000_movies.csv", na_filter=True, na_values='[]')
credits = pd.read_csv("/content/drive/MyDrive/Datasets/tmdb_5000_credits.csv")

If we take a look in the dataset, it is perceptive that there are some features in JSON, numeric and string format, which will need preprocessing functions to be suitable for the model later on. All those preprocessing steps to work in a pipeline need to be in a few ready and custom transformers.

movies.head(2)

We can have an idea of which features to use, through Pandas correlation function, as it is possible to see below, the columns “vote_count” and “budget” have strong correlation with revenue.

corr_matrix = movies.corr()
corr_matrix["revenue"].sort_values(ascending=False)

The next step is to decide which features to use for a better prediction in revenue. Based on the results of correlation and assumptions, those columns below will be used.

num_columns_to_use = ['budget', 'popularity', 'vote_average', 'vote_count']
str_columns_to_use = ['genres', 'production_companies']

Drop NaN values for string columns

movies = movies.dropna(subset=['production_companies', 'genres'])
movies.reset_index(drop=True, inplace=True)

Next we will create a function to get the values with the most appearances in ‘production_companies’ and ‘genres’ columns. The function will run in the entire dataset, then return the values to an array to be used later on.

from ast import literal_eval

def values_with_most_appearences(X, columns):
values = []
X_copy = X.copy()

for column in columns:
X_copy[column] = X_copy[column].apply(literal_eval)

for row in X.index:
names = [i['name'] for i in X_copy[column][row]]
X_copy[column][row] = names[:2] if len(names) >= 2 else names

temp_df = pd.DataFrame(X_copy[column].to_list(), columns=['feature_1', 'feature_2'])

if column == "production_companies":
new_features_df = pd.Series(np.concatenate([temp_df["feature_1"], temp_df["feature_2"]])).value_counts()[:150].index
else:
feature_1 = temp_df["feature_1"].unique()
feature_2 = temp_df["feature_2"].unique()

new_features_df = np.concatenate([feature_1, feature_2])
new_features_df = list(dict.fromkeys(new_features_df))

values.append(new_features_df)
return values

new_df = values_with_most_appearences(movies, ['genres', 'production_companies'])
print(new_df[0][:3])
print(new_df[1][:3])

Next, we split our data into a train and test set, resetting rows of dataframe as well to prevent future errors.

from sklearn.model_selection import train_test_split

X = movies.drop('revenue', axis=1)
y = movies['revenue']

X_train, X_test, y_train, y_test = train_test_split(movies, y, test_size=0.2, random_state=42)

X_train.reset_index(drop=True, inplace=True)
X_test.reset_index(drop=True, inplace=True)
y_train.reset_index(drop=True, inplace=True)
y_test.reset_index(drop=True, inplace=True)

Building custom transformers

Every transformer is a class, with at least one fit() and transform() method. To be part of a Pipeline in Scikit-learn, one also needs to inherit BaseEstimator and TransformerMixin from the sklearn.base module. The TransformerMixin class runs fit() and transformer() one after another in one call withfit_transform(). The BaseEstimator gets the methods get_params() and set_params() methods that can be useful.

The first custom transformer that we need to build must receive a dataframe without any columns selected or preprocessing steps and receive an array with the columns we want.

Note: It is good practice for preventing some errors to declare the variable in __init__ with the same name that was placed in the parameters.

from sklearn.base import BaseEstimator, TransformerMixin

class ColumnsSelector(BaseEstimator, TransformerMixin):
def __init__(self, columns):
self.columns = columns

def fit(self, X, y=None):
return self

def transform(self, X, y=None):
return X[self.columns]

We have two columns, ‘genres’ and ‘production_companies’, that are in JSON format, with X values for each row. The idea is to get the first two values from each feature, and put it in a list, substituting the whole JSON that was there before.

from ast import literal_eval

class FormatToArrayOfXValues(BaseEstimator, TransformerMixin):
def __init__(self, columns):
self.columns = columns

def fit(self, X, y=None):
return self

def transform(self, X, y=None):
X_copy = X.copy()

for column in self.columns:
X_copy[column] = X_copy[column].apply(literal_eval)

for row in X_copy.index:
names = [i['name'] for i in X_copy[column][row]]
X_copy[column][row] = names[:2] if len(names) >= 2 else names

return X_copy

The last custom transformer for our string pipeline will create a column for each value we got before in our values_with_most_appearences() function, then verify if there is a column for the values we put on a list before (values from ‘genres’ and ‘production_companies’ columns), if exists, assign 1, else 0.

class ValuesToColumns(BaseEstimator, TransformerMixin):
def __init__(self, columns, new_features):
self.columns = columns
self.new_features = new_features

def fit(self, X, y=None):
return self

def transform(self, X, y=None):
X_copy = X.copy()
X_copy.reset_index(drop=True, inplace=True)

for i in range(2):
X_copy = pd.concat([X_copy, pd.DataFrame(columns=self.new_features[i])])

X_copy.fillna(int(0), inplace=True)

for column in self.columns:
for row in range(len(X_copy[column])):
for value in X_copy[column][row]:
if value != None and value in X_copy.columns:
X_copy.loc[row, value] = 1

X_copy.drop(columns=self.columns, inplace=True)

return X_copy

All the custom transformers needed for data processing have been created. Now we can build our first pipeline, bringing all the Transformers to one place.

from sklearn.pipeline import Pipeline

str_pipe = Pipeline(steps=[
("ColumnsSelector", ColumnsSelector(str_columns_to_use)),
("Format", FormatToArrayOfXValues(str_columns_to_use)),
("ValuesToColumns", ValuesToColumns(str_columns_to_use, new_df))
])

Next, we still need to take care of some numeric features we decided to use. The custom transformer, ColumnsSelector, can be used again to select all the numeric columns we want, plus a Transformer from sklearn.preprocessing module, to normalize our data.

from sklearn.preprocessing import MinMaxScaler

num_pipe = Pipeline(steps=[
("ColumnsSelector", ColumnsSelector(num_columns_to_use)),
("Normalize", MinMaxScaler())
])

To join the two pipelines into the same function, we can use ColumnTransformer to perform each preprocessing step into the columns we want and gather them all together.

from sklearn.compose import ColumnTransformer

full_processor = ColumnTransformer(transformers=[
('number', num_pipe, num_columns_to_use),
('category', str_pipe, str_columns_to_use)
])

To run the pipeline, it is simple, just call fit_transform() and we can see our data processed.

example = full_processor.fit_transform(X_train)

It is possible to add a model to our pipeline.

from sklearn.ensemble import RandomForestRegressor

full_pipeline = Pipeline(steps=[
('preprocess', full_processor),
('model', RandomForestRegressor())
])

Hyperparameter tuning is possible also, we can use RandomizedSearchCV or GridSearchCV.

Note: Take care when dealing with indexes in a dataframe and running RandomizedSearch or GridSearch, it can return a lot of errors. That is why we are using some ‘reset_index’ throughout our Transformers.

from sklearn.model_selection import RandomizedSearchCV
from sklearn.model_selection import GridSearchCV

search = RandomizedSearchCV(full_pipeline, params, scoring="neg_mean_squared_error", cv=5, error_score='raise')
_ = search.fit(X_train, y_train)
print('Score:', abs(search.best_score_))

We were able to perform all the necessary process to learn how to use Scikit-learn Pipelines and Transformers.

Thank you so much for reading, and if you enjoyed it and are interested in reading other articles, follow me here on Medium.

If you want to know more about my projects, articles or want to contact me, you can connect with me on LinkedIn or follow my GitHub.

--

--