With Python for Snowpark now in GA, we have the ability to run our models closer to where our data lives. Aided by the processing power of Snowflake, we can run our model predictions easily.
In this tutorial, we’re going to take the California Housing Prices dataset, which is fairly well-known, and upload it into Snowflake using Snowpark for Python. Then, we will do a quick data exploration, cross-validation, and fine-tuning to deploy our model into Snowflake. This way, we can get our predictions using Snowflake’s data warehouse.
1. Load Necessary Python for Snowpark Packages and Connect to Snowflake
We will begin by importing necesary libraries, the session class will give us everything we need to connect to Snowflake, using the same parameters from the connect function in the Snowflake Connector for Python. To create a session we will build a Python dictionary containing the names and values of the parameters, see example below.
For this example, we placed those parameters in a different file called “config.py”.
Let’s begin by importing the standard machine learning libraries like Pandas, Numpy and Matplotlib, as well as several Snowpark libraries such as “functions”. These provide utility and SQL functions to generate column expressions to pass to DataFrame transformation methods, date types, and UDF.
What’s the purpose of this? To build our predictions in Snowflake.
# Snowflake packages
import snowflake.snowpark
from snowflake.snowpark.functions import sproc
from snowflake.snowpark.session import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark.types import *
from snowflake.snowpark.functions import udf,col, is_null
# Python packages
import sys
import cachetools
import os
import pandas as pd
import numpy as np
import io
import joblib
%matplotlib inline
import matplotlib.pyplot as plt
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import RepeatedKFold
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.model_selection import RandomizedSearchCV
from xgboost import XGBRegressor
#Snowflake connection info is saved in config.py
from config import snowflake_conn_prop
Print Snowpark version.
from snowflake.snowpark import version
print(version.VERSION)
(0, 8, 0)
Now, we’ll load our data using pd.read() and pull a sample.
# Read the housing data
data=pd.read_csv('https://raw.githubusercontent.com/serbre/Snowpark_migrate/main/housing.csv')
data.columns= ['LONGITUDE',
'LATITUDE',
'HOUSING_MEDIAN_AGE',
'TOTAL_ROOMS',
'TOTAL_BEDROOMS',
'POPULATION',
'HOUSEHOLDS',
'MEDIAN_INCOME',
'MEDIAN_HOUSE_VALUE',
'OCEAN_PROXIMITY']
data.head()
The next step is to create a connection. We’ll pass the dictionary containing the name and values of the parameters from config.py to the session.builder.configs method. This will return an object with the parameters needed to call the Create method and establish connection.
session = Session.builder.configs(snowflake_conn_prop).create()
To create our environment in Snowflake, you can load different variables with the rolename, database, schema, and warehouse name, and run them within an SQL in Snowflake environment using the session.sql command. You can update these to adapt your Snowflake environment. Just make sure you have create privileges in your database.
Snowpark operations are lazy. To trigger queries, you need to call an Action function such as the Collect function.
rolename = "SYSADMIN"
dbname = "DEMO"
schemaname = "TEST"
warehouse = "COMPUTE_WH"
session.sql(f"USE ROLE {rolename}").collect()
[Row(status=’Statement executed successfully.’)]
# Create a database, schema, warehouse)
session.sql(f"CREATE DATABASE IF NOT EXISTS {dbname}").collect()
session.sql(f"CREATE SCHEMA IF NOT EXISTS {dbname}.{schemaname}").collect()
session.sql(f"CREATE WAREHOUSE IF NOT EXISTS {warehouse} \
WAREHOUSE_SIZE = 'Large' \
AUTO_SUSPEND = 300 \
AUTO_RESUME = TRUE \
MIN_CLUSTER_COUNT = 1 \
MAX_CLUSTER_COUNT = 3 \
SCALING_POLICY = 'STANDARD' ").collect()
[Row(status=’COMPUTE_WH already exists, statement succeeded.’)]
session.sql(f"USE WAREHOUSE {warehouse}").collect()
session.sql(f"USE SCHEMA {dbname}.{schemaname}").collect()
[Row(status=’Statement executed successfully.’)]
print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())
[Row(CURRENT_WAREHOUSE()=’COMPUTE_WH’, CURRENT_DATABASE()=’DEMO’, CURRENT_SCHEMA()=’TEST’)]
session.add_packages('snowflake-snowpark-python', 'scikit-learn', 'pandas', 'numpy', 'joblib', 'cachetools', 'xgboost')
Now, to explain the whole process, we will load our CSV into Snowflake and use it for our analysis. In your case, the data might already be in Snowflake.
The following variable names contain our file name. In the housing.csv example, we use this same name for the stage for the file, model, and UDF, as well as the raw table name.
The session.file.put command will load the file in the stage. Please note that, for this exercise, it’s important to have our file in the same folder as the notebook.
filename = "housing.csv"
stagename = "rawdata"
model_stage = "models"
rawtable = "housing"
udf="udf"
# Create Snowflake stage
session.sql(f"create or replace stage {stagename} DIRECTORY = (ENABLE = TRUE);").collect()
# Put the file in the stage
session.file.put(filename,stagename)
[PutResult(source=’housing.csv’, target=’housing.csv.gz’, source_size=1423529, target_size=396000, source_compression=’NONE’, target_compression=’GZIP’, status=’UPLOADED’, message=”)]
session.sql(f"create or replace stage {model_stage} DIRECTORY = (ENABLE = TRUE) copy_options = (on_error='skip_file');").collect()
session.sql(f"create or replace stage {udf} copy_options = (on_error='skip_file');").collect()
[Row(status=’Stage area UDF successfully created.’)]
The next step is to create our schema by using a SructType containing names of the columns and data types of such columns. Then, we’ll call the schema property to return an object configured to read files that contains the specified fields.
In the Options property, we define the format fields like compressions, field delimiter, and type. Finally, with the CSV option, we indicate the location of our file to create our table.
schema_for_data_file = StructType([
StructField("longitude",IntegerType()),
StructField("latitude",IntegerType()),
StructField("housing_median_age",IntegerType()),
StructField("total_rooms",IntegerType()),
StructField("total_bedrooms",IntegerType()),
StructField("population",IntegerType()),
StructField("households",IntegerType()),
StructField("median_income",IntegerType()),
StructField("median_house_value",IntegerType()),
StructField("ocean_proximity",StringType())])
format_options={"compression": "gzip", "type" : "csv", "field_delimiter" : ",","skip_header": 1}
csv_reader=csv_reader.options(format_options)
df = csv_reader.csv("@rawdata/housing.csv.gz")
# In case you already have a table there
drop_result = session.sql(f"drop table if exists {rawtable}").collect()
drop_result
[Row(status=’HOUSING successfully dropped.’)]
df_table=df.copy_into_table("housing",format_type_options=format_options ,force=True)
%%time
housing_snowflake = session.table('housing')
pd.pandas.set_option('display.max_columns', None)
data = housing_snowflake.toPandas()
data.head()
CPU times: user 429 ms, sys: 32 ms, total: 461 ms Wall time: 3.5 s
data.info()
2. Perform Exploratory Data Analysis (EDA)
Now that we have our data in Snowflake, we can decide if we want to work with Snowpark Python or with our preferred Python native libraries. Snowpark offers the ability to run most of the analysis in Snowflake, preventing data from leaving your database.
We’ll perform a quick visual EDA starting with a histogram on numeric variables to see how our data is represented.
At first glance, we observe that the attributes have very different ranges. Using such insight, we’ll work on feature scaling with transformation pipelines.
data.hist(bins=50, figsize=(20,15))
plt.show()
cat_vars = [var for var in data.columns if ((data[var].dtype == 'O') or (data[var].dtype=="bool"))]
print('Categorical Variables:' , cat_vars)
num_vars = [var for var in data.columns if var not in cat_vars]
print('Numerical Variables:', num_vars)
Categorical Variables: [‘OCEAN_PROXIMITY’] Numerical Variables: [‘LONGITUDE’, ‘LATITUDE’, ‘HOUSING_MEDIAN_AGE’, ‘TOTAL_ROOMS’, ‘TOTAL_BEDROOMS’, ‘POPULATION’, ‘HOUSEHOLDS’, ‘MEDIAN_INCOME’, ‘MEDIAN_HOUSE_VALUE’]
Because median income is an important predictor for median housing prices, we’ll create a new attribute with pd.cut() called INCOME_CAT. This will define 5 categories (labeled from 1 to 5) based on Median_INCOME cuts.
data["INCOME_CAT"] = pd.cut(data["MEDIAN_INCOME"],
bins=[0., 1.5, 3.0, 4.5, 6., np.inf],
labels=[1, 2, 3, 4, 5])
At this point, we need to clean our data and since null values can bring problems, below we can see TOTAL_BEDROOMS and INCOME_CAT having lower non-null count than the rest of the dataset, let’s turn our pandas dataset back to Snowpark dataframe and handle those null values.
data.info()
data_no_nulls=session.create_dataframe(data)
Variables TOTAL_BEDROOMS and INCOME_CAT have 207 and 12 null values respectively.
bedrooms_null=data_no_nulls.filter(is_null(col('TOTAL_BEDROOMS'))).count()
income_cat_null=data_no_nulls.filter(is_null(col('INCOME_CAT'))).count()
print(f"Total Bedrooms nulls: {bedrooms_null} \nIncome cat nulls: {income_cat_null}")
Total Bedrooms nulls: 207
Income cat nulls: 12
In this case, we’re going to drop the NAs. Please note that calculating the mean instead is also an option if the dataset is small, and you need each and every row.
data_no_nulls=data_no_nulls.na.drop()
data_no_nulls.toPandas().info()
Now, we can load it to Snowflake without any null values.
data_no_nulls.write.mode("overwrite").save_as_table("Data_no_nulls")
3. Training
We’re getting into the fun part! We can test different machine learning algorithms and see how they perform in our data. To do so, we’ll create a Python function that encapsulates everything we consider necessary to train our model in Snowflake.
The function save_file will drop our model into Snowflake to create a .joblib file. We can then call it and use it for inference after our training.
def save_file(session, model, path):
input_stream = io.BytesIO()
joblib.dump(model, input_stream)
session._conn._cursor.upload_stream(input_stream, path)
return "successfully created file: " + path
4. Deploy the Model in Snowflake
1. Data load
First, let’s create a selection to pick the models we want to test, in this case we’re going to include XGBoost and Random forest We’ll take our data, cleaned from null values, and put it as a Pandas dataframe.
2. Split the Train and Test Set
Perform a stratified split of our data based on INCOME_CAT. Thus, we can be sure that our test set has almost the same proportion of income categories than the full dataset. For more information on why is this important, please check out pages 57 and 58 of the second reference we included at the end of this tutorial. Finally, we’ll save the train and test set and get the data labels.
3. Create a Pipeline for Numerical and Categorical Features
This is where the transformation process takes place. By separating our numerical and categorical features to prepare them for training, we’re performing feature scaling to get all variables in the same scale. We can then apply one hot encoder to convert categories into numbers, fit them in the pre-processing pipeline.
4. Predict on the Test Set and Return the Root Mean Squared Error (RMSE)
In this step we use cross-validation and RandomizedSearchCV to find optimal parameters for our algorithm, the final step saves the model in the MODEL stage as .joblib file. The full_pipeline.transform will apply transformations done in the training set. In this case, we used the Random Forest Regressor.
def train_model(session: snowflake.snowpark.Session, table:str, params_distr:dict, algorithm:str) -> float:
#1. Ingestion
#Let's create a selection to pick the models we want to test, in this case we're going to include XGBoost and Random forest
models_select= {}
xgb= XGBRegressor(nthreads=1, random_state=42)
forest= RandomForestRegressor(random_state=42)
models_select["xgb"] = xgb
models_select["forest"] = forest
#Load the table without null values
data_no_nulls = session.table(table)
data=data_no_nulls.to_pandas()
#2. split the train and test set
split = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
for train_index, test_index in split.split(data, data["INCOME_CAT"]):
strat_train_set = data.iloc[train_index]
strat_test_set = data.iloc[test_index]
#strat_train_set, strat_test_set = data.random_split([0.8, 0.2], seed=82)
# save the train and test sets as time stamped tables in Snowflake
session.create_dataframe(strat_train_set).write.mode("overwrite").save_as_table("HOUSING_TRAIN")
session.create_dataframe(strat_test_set).write.mode("overwrite").save_as_table("HOUSING_TEST")
#3. create a pipeline for numerical and categorical features
data = strat_train_set.drop(['MEDIAN_HOUSE_VALUE', 'INCOME_CAT'], axis=1)
data_labels = strat_train_set["MEDIAN_HOUSE_VALUE"].copy()
housing_test = strat_test_set.drop("MEDIAN_HOUSE_VALUE", axis=1)
housing_test_labels = strat_test_set["MEDIAN_HOUSE_VALUE"].copy()
# numerical features
housing_num = data.drop("OCEAN_PROXIMITY", axis=1)
# create a pipeline for numerical features
num_pipeline = Pipeline([
('std_scaler', StandardScaler()),
])
data_num_tr = num_pipeline.fit_transform(housing_num)
num_attribs = list(housing_num)
cat_attribs = ["OCEAN_PROXIMITY"]
preprocessor = ColumnTransformer([
("num", num_pipeline, num_attribs),
("cat", OneHotEncoder(), cat_attribs),
])
#data_prepared = preprocessor.fit_transform(data)
full_pipeline = Pipeline([
('preprocessor', preprocessor),
('model', models_select[algorithm]),
])
#4 predict on the test set and return the root mean squared error (RMSE)
# Good practice of repeated k-fold cross-validation with 3 repeats and 10 folds in this case we select 5 for faster execution.
cv = RepeatedKFold(n_splits=5, n_repeats=3, random_state=1)
random_search = RandomizedSearchCV(full_pipeline, param_distributions=params_distr,
n_iter=50, cv=cv, scoring='neg_mean_squared_error',
verbose=1, random_state=42)
random_search.fit(data, data_labels)
#5 save the model
save_file(session, random_search, "@MODELS/housing_reg.joblib")
negative_mse = random_search.best_score_
rmse = np.sqrt(-negative_mse)
return rmse
params_distr = {'model__n_estimators': [int(x) for x in np.linspace(start = 1, stop = 20, num = 20)],
'model__max_features': ['auto', 'sqrt'],
'model__max_depth': [int(x) for x in np.linspace(10, 120, num = 12)],
'model__min_samples_split': [2, 6, 10],
'model__min_samples_leaf':[1, 3, 4],
'model__bootstrap': [True, False]}
table="Data_no_nulls"
# Create an instance of StoredProcedure using the sproc() function
train_model_sp = sproc(train_model, replace=True, session=session)
train_model_sp(table,params_distr, "forest", session=session)
59098.024468258394
Random Forest gets us a prediction error of $59098. Although this isn’t a satisfactory result, we’re going to leave it there. When working with any data set, please check different algorithms and features to see which one will give you best results. The function built previously can be expanded to include other algorithms besides Random Forests.
Note: By the time this article was written, executing XGBoost would eventually exhaust warehouse memory, but with the recent release of Snowpark in GA, Snowflake also announced Snowpark-Optimized Warehouses. These are recommended for workloads that have large memory requirements such as ML training. You will see XGBoost is already there, so just give a try and also add your own.
import sys
import cachetools
import os
session.add_import("@MODELS/housing_reg.joblib")
@cachetools.cached(cache={})
def read_file(filename):
import_dir = sys._xoptions.get("snowflake_import_directory")
if import_dir:
with open(os.path.join(import_dir, filename), 'rb') as file:
m = joblib.load(file)
return m
The @udf decorator will package the function as Snowflake Python UDF and save the model into Snowflake’s internal stage UDF. The function name will be “predict”. It will run the previous read_file function, load it into the variable called “m”, create a dataframe with locals variables, and run “predict” on the variable row.
from snowflake.snowpark.functions import udf
features = ['LONGITUDE', 'LATITUDE', 'HOUSING_MEDIAN_AGE', 'TOTAL_ROOMS',
'TOTAL_BEDROOMS', 'POPULATION', 'HOUSEHOLDS', 'MEDIAN_INCOME', 'OCEAN_PROXIMITY']
@udf(name='predict', is_permanent=True, stage_location='@udf', replace=True, session=session)
def predict(LONGITUDE: float, LATITUDE: float, HOUSING_MEDIAN_AGE: float, TOTAL_ROOMS: float,
TOTAL_BEDROOMS: float, POPULATION: float, HOUSEHOLDS: float, MEDIAN_INCOME: float,
OCEAN_PROXIMITY: str) -> float:
m = read_file('housing_reg.joblib')
row = pd.DataFrame([locals()], columns=features)
return m.predict(row)[0]
We load the Housing_Test set we created before in snowdf_test. If we want to predict “Median_house_Value”, we drop that one and the one we created earlier “Income_Cat”. Once that is done, we call our predict UDF.
from snowflake.snowpark import functions as F
snowdf_test = session.table("HOUSING_TEST")
inputs = snowdf_test.drop("MEDIAN_HOUSE_VALUE", "INCOME_CAT")
snowdf_results = snowdf_test.select(
*inputs,
predict(*inputs).alias('PREDICTION'),
(F.col('MEDIAN_HOUSE_VALUE')).alias('ACTUAL_LABEL')
).limit(20)
snowdf_results.to_pandas()
And finally, we place the Actual_Label versus the Prediction.
snowdf_results.select(snowdf_results.PREDICTION,snowdf_results.ACTUAL_LABEL).to_pandas()
And this is the view we get placing a Housing_Detection table with our predictions in Snowflake.
Too Long, Didn’t Read:
In this tutorial, you learned how to take an already available data science model and migrate it to Snowflake using Snowpark for Python.
Some of the points we covered here were:
- How to connect to Snowflake
- Loading data using Snowpark functions like copy_into
- Connect to your data in Snowflake
- Perform data science predictive modelling in our data with EDA, data cleaning, Cross-validation and fine tuning
- Deploy the model in Snowflake with UDF python functions.
We hope you enjoyed the tutorial as much as we did creating it! As a closing thought, if you no longer need to use a session for executing queries and want to cancel any running queries, calling the Close method from the Session object is a good way to go.
session.close()
How Hakkoda Can Help
Our highly-trained team of experts at Hakkoda can help you move to a modern data stack by using the latest functionalities and tools. We can put in place a data solution that suits your objectives, our knowledge and expertise takes care of the rest.
At Hakkoda, we’ll leverage data engineering, data science, and even data-driven full-stack application development. To start your data innovation journey with state-of-the-art data solutions, contact us today.
More information and references
Snowpark Developer Guide for Python: https://docs.snowflake.com/en/developer-guide/snowpark/python/index.html
Snowpark-optimized Warehouses: https://docs.snowflake.com/en/user-guide/warehouses-snowpark-optimized.html#billing-for-snowpark-optimized-warehouses
Hands-On Machine Learning with Scikit-Learn, Keras and TensorFlow by Aurélien Géron (O’Reilly). Copyright 2019 Aurélien Géron, 978-1-492-03264-9.