Python for Snowpark: Migrating Your Data Science Projects

With the release of Python for Snowpark, we can run models using Snowflake's processing power. This tutorial will show us how.
November 15, 2022
Share
Python for Snowpark - Hakkoda - Thumbnail

With Python for Snowpark now in GA, we have the ability to run our models closer to where our data lives. Aided by the processing power of Snowflake, we can run our model predictions easily.

In this tutorial, we’re going to take the California Housing Prices dataset, which is fairly well-known, and upload it into Snowflake using Snowpark for Python. Then, we will do a quick data exploration, cross-validation, and fine-tuning to deploy our model into Snowflake. This way, we can get our predictions using Snowflake’s data warehouse.

1. Load Necessary Python for Snowpark Packages and Connect to Snowflake

We will begin by importing necesary libraries, the session class will give us everything we need to connect to Snowflake, using the same parameters from the connect function in the Snowflake Connector for Python. To create a session we will build a Python dictionary containing the names and values of the parameters, see example below.

For this example, we placed those parameters in a different file called “config.py”.

Let’s begin by importing the standard machine learning libraries like Pandas, Numpy and Matplotlib, as well as several Snowpark libraries such as “functions”. These provide utility and SQL functions to generate column expressions to pass to DataFrame transformation methods, date types, and UDF.

What’s the purpose of this? To build our predictions in Snowflake. 

				
					# Snowflake packages
import snowflake.snowpark
from snowflake.snowpark.functions import sproc
from snowflake.snowpark.session import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark.types import *
from snowflake.snowpark.functions import udf,col, is_null

# Python packages
import sys
import cachetools
import os
import pandas as pd
import numpy as np
import io
import joblib
%matplotlib inline
import matplotlib.pyplot as plt
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import RepeatedKFold
from sklearn.model_selection import StratifiedShuffleSplit
from sklearn.model_selection import RandomizedSearchCV
from xgboost import XGBRegressor

#Snowflake connection info is saved in config.py
from config import snowflake_conn_prop
				
			

Print Snowpark version.

				
					from snowflake.snowpark import version
print(version.VERSION)
				
			

(0, 8, 0)

Now, we’ll load our data using pd.read() and pull a sample.

				
					# Read the housing data
data=pd.read_csv('https://raw.githubusercontent.com/serbre/Snowpark_migrate/main/housing.csv')

data.columns= ['LONGITUDE',
 'LATITUDE',
 'HOUSING_MEDIAN_AGE',
 'TOTAL_ROOMS',
 'TOTAL_BEDROOMS',
 'POPULATION',
 'HOUSEHOLDS',
 'MEDIAN_INCOME',
 'MEDIAN_HOUSE_VALUE',
 'OCEAN_PROXIMITY']
				
			
				
					data.head()
				
			
Snowpark for Python - Hakkoda - Image 1

The next step is to create a connection. We’ll pass the dictionary containing the name and values of the parameters from config.py to the session.builder.configs method. This will return an object with the parameters needed to call the Create method and establish connection.

				
					session = Session.builder.configs(snowflake_conn_prop).create()
				
			

To create our environment in Snowflake, you can load different variables with the rolename, database, schema, and warehouse name, and run them within an SQL in Snowflake environment using the session.sql command. You can update these to adapt your Snowflake environment. Just make sure you have create privileges in your database.

Snowpark operations are lazy. To trigger queries, you need to call an Action function such as the Collect function.

				
					rolename = "SYSADMIN"
dbname = "DEMO"
schemaname = "TEST"
warehouse = "COMPUTE_WH"

session.sql(f"USE ROLE {rolename}").collect()
				
			

[Row(status=’Statement executed successfully.’)]

				
					#  Create a database, schema, warehouse)
session.sql(f"CREATE DATABASE IF NOT EXISTS {dbname}").collect()
session.sql(f"CREATE SCHEMA IF NOT EXISTS {dbname}.{schemaname}").collect()
session.sql(f"CREATE WAREHOUSE  IF NOT EXISTS {warehouse} \
                WAREHOUSE_SIZE = 'Large' \
                AUTO_SUSPEND = 300 \
                AUTO_RESUME = TRUE \
                MIN_CLUSTER_COUNT = 1 \
                MAX_CLUSTER_COUNT = 3 \
                SCALING_POLICY = 'STANDARD' ").collect()
				
			

[Row(status=’COMPUTE_WH already exists, statement succeeded.’)]

				
					session.sql(f"USE WAREHOUSE {warehouse}").collect()
session.sql(f"USE SCHEMA {dbname}.{schemaname}").collect()
				
			

[Row(status=’Statement executed successfully.’)]

				
					print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())
				
			

[Row(CURRENT_WAREHOUSE()=’COMPUTE_WH’, CURRENT_DATABASE()=’DEMO’, CURRENT_SCHEMA()=’TEST’)]

				
					session.add_packages('snowflake-snowpark-python', 'scikit-learn', 'pandas', 'numpy', 'joblib', 'cachetools', 'xgboost')
				
			

Now, to explain the whole process, we will load our CSV into Snowflake and use it for our analysis. In your case, the data might already be in Snowflake.

The following variable names contain our file name. In the housing.csv example, we use this same name for the stage for the file, model, and UDF, as well as the raw table name.

The session.file.put command will load the file in the stage. Please note that, for this exercise, it’s important to have our file in the same folder as the notebook.

				
					filename = "housing.csv"
stagename = "rawdata"
model_stage = "models"
rawtable = "housing"
udf="udf"
				
			
				
					# Create Snowflake stage
session.sql(f"create or replace stage {stagename} DIRECTORY = (ENABLE = TRUE);").collect()

# Put the file in the stage
session.file.put(filename,stagename)
				
			

[PutResult(source=’housing.csv’, target=’housing.csv.gz’, source_size=1423529, target_size=396000, source_compression=’NONE’, target_compression=’GZIP’, status=’UPLOADED’, message=”)]

				
					session.sql(f"create or replace stage {model_stage} DIRECTORY = (ENABLE = TRUE) copy_options = (on_error='skip_file');").collect()
session.sql(f"create or replace stage {udf} copy_options = (on_error='skip_file');").collect()
				
			

[Row(status=’Stage area UDF successfully created.’)]

The next step is to create our schema by using a SructType containing names of the columns and data types of such columns. Then, we’ll call the schema property to return an object configured to read files that contains the specified fields.

In the Options property, we define the format fields like compressions, field delimiter, and type. Finally, with the CSV option, we indicate the location of our file to create our table.

				
					schema_for_data_file = StructType([
        StructField("longitude",IntegerType()),
        StructField("latitude",IntegerType()),
        StructField("housing_median_age",IntegerType()),
        StructField("total_rooms",IntegerType()),
        StructField("total_bedrooms",IntegerType()),
        StructField("population",IntegerType()),
        StructField("households",IntegerType()),
        StructField("median_income",IntegerType()),
        StructField("median_house_value",IntegerType()),
        StructField("ocean_proximity",StringType())])

				
			
csv_reader=session.read.schema(schema_for_data_file)
				
					format_options={"compression": "gzip",  "type" : "csv", "field_delimiter" : ",","skip_header": 1}
csv_reader=csv_reader.options(format_options)
				
			
				
					df = csv_reader.csv("@rawdata/housing.csv.gz")
				
			
				
					# In case you already have a table there

drop_result = session.sql(f"drop table if exists {rawtable}").collect() 
drop_result
				
			

[Row(status=’HOUSING successfully dropped.’)]

				
					df_table=df.copy_into_table("housing",format_type_options=format_options ,force=True)
				
			
				
					%%time

housing_snowflake = session.table('housing')

pd.pandas.set_option('display.max_columns', None)

data = housing_snowflake.toPandas()
data.head()
				
			

CPU times: user 429 ms, sys: 32 ms, total: 461 ms Wall time: 3.5 s

Snowpark for Python - Hakkoda - Image 2
				
					data.info()
				
			
Snowpark for Python - Hakkoda - Image 4

2. Perform Exploratory Data Analysis (EDA)

Now that we have our data in Snowflake, we can decide if we want to work with Snowpark Python or with our preferred Python native libraries. Snowpark offers the ability to run most of the analysis in Snowflake, preventing data from leaving your database.

We’ll perform a quick visual EDA starting with a histogram on numeric variables to see how our data is represented.

At first glance, we observe that the attributes have very different ranges. Using such insight, we’ll work on feature scaling with transformation pipelines.

				
					data.hist(bins=50, figsize=(20,15))
plt.show()
				
			
Snowpark for Python - Hakkoda - Image 1
				
					cat_vars = [var for var in data.columns if ((data[var].dtype == 'O') or (data[var].dtype=="bool"))]
print('Categorical Variables:' , cat_vars)

num_vars = [var for var in data.columns if var not in cat_vars]
print('Numerical Variables:', num_vars)
				
			

Categorical Variables: [‘OCEAN_PROXIMITY’] Numerical Variables: [‘LONGITUDE’, ‘LATITUDE’, ‘HOUSING_MEDIAN_AGE’, ‘TOTAL_ROOMS’, ‘TOTAL_BEDROOMS’, ‘POPULATION’, ‘HOUSEHOLDS’, ‘MEDIAN_INCOME’, ‘MEDIAN_HOUSE_VALUE’]

Because median income is an important predictor for median housing prices, we’ll create a new attribute with pd.cut() called INCOME_CAT. This will define 5 categories (labeled from 1 to 5) based on Median_INCOME cuts.

				
					data["INCOME_CAT"] = pd.cut(data["MEDIAN_INCOME"],
 bins=[0., 1.5, 3.0, 4.5, 6., np.inf],
 labels=[1, 2, 3, 4, 5])
				
			

At this point, we need to clean our data and since null values can bring problems, below we can see TOTAL_BEDROOMS and INCOME_CAT having lower non-null count than the rest of the dataset, let’s turn our pandas dataset back to Snowpark dataframe and handle those null values.

				
					data.info()
				
			
Snowpark for Python - Hakkoda - Image 3
				
					data_no_nulls=session.create_dataframe(data)
				
			

Variables TOTAL_BEDROOMS and INCOME_CAT have 207 and 12 null values respectively.

				
					bedrooms_null=data_no_nulls.filter(is_null(col('TOTAL_BEDROOMS'))).count()
income_cat_null=data_no_nulls.filter(is_null(col('INCOME_CAT'))).count()

print(f"Total Bedrooms nulls: {bedrooms_null} \nIncome cat nulls: {income_cat_null}")
				
			

Total Bedrooms nulls: 207

Income cat nulls: 12

In this case, we’re going to drop the NAs. Please note that calculating the mean instead is also an option if the dataset is small, and you need each and every row.

				
					data_no_nulls=data_no_nulls.na.drop()
				
			
				
					data_no_nulls.toPandas().info()
				
			
Snowpark for Python - Hakkoda - Image 6

Now, we can load it to Snowflake without any null values.

				
					data_no_nulls.write.mode("overwrite").save_as_table("Data_no_nulls")
				
			

3. Training

 

We’re getting into the fun part! We can test different machine learning algorithms and see how they perform in our data. To do so, we’ll create a Python function that encapsulates everything we consider necessary to train our model in Snowflake.

The function save_file will drop our model into Snowflake to create a .joblib file. We can then call it and use it for inference after our training.

				
					def save_file(session, model, path):
  input_stream = io.BytesIO()
  joblib.dump(model, input_stream)
  session._conn._cursor.upload_stream(input_stream, path)
  return "successfully created file: " + path
				
			

4. Deploy the Model in Snowflake

The train_model function, as the name indicates, trains our model into Snowflake using the parameters we choose. The function is divided into four parts:

1. Data load

First, let’s create a selection to pick the models we want to test, in this case we’re going to include XGBoost and Random forest We’ll take our data, cleaned from null values, and put it as a Pandas dataframe.

2. Split the Train and Test Set

Perform a stratified split of our data based on INCOME_CAT. Thus, we can be sure that our test set has almost the same proportion of income categories than the full dataset. For more information on why is this important, please check out pages 57 and 58 of the second reference we included at the end of this tutorial. Finally, we’ll save the train and test set and get the data labels.

3. Create a Pipeline for Numerical and Categorical Features

This is where the transformation process takes place. By separating our numerical and categorical features to prepare them for training, we’re performing feature scaling to get all variables in the same scale. We can then apply one hot encoder to convert categories into numbers, fit them in the pre-processing pipeline.

4. Predict on the Test Set and Return the Root Mean Squared Error (RMSE)

In this step we use cross-validation and RandomizedSearchCV to find optimal parameters for our algorithm, the final step saves the model in the MODEL stage as .joblib file. The full_pipeline.transform will apply transformations done in the training set. In this case, we used the Random Forest Regressor.

				
					def train_model(session: snowflake.snowpark.Session, table:str, params_distr:dict, algorithm:str) -> float:
        #1. Ingestion
        #Let's create a selection to pick the models we want to test, in this case we're going to include XGBoost and Random forest
        models_select= {}
        xgb= XGBRegressor(nthreads=1, random_state=42)
        forest= RandomForestRegressor(random_state=42)   
        models_select["xgb"] = xgb
        models_select["forest"] = forest
        
        #Load the table without null values 
        data_no_nulls = session.table(table)
        data=data_no_nulls.to_pandas()
        
        #2. split the train and test set
        split = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
        for train_index, test_index in split.split(data, data["INCOME_CAT"]):
            strat_train_set = data.iloc[train_index]
            strat_test_set = data.iloc[test_index]
        #strat_train_set, strat_test_set = data.random_split([0.8, 0.2], seed=82)

        # save the train and test sets as time stamped tables in Snowflake 
        session.create_dataframe(strat_train_set).write.mode("overwrite").save_as_table("HOUSING_TRAIN")
        session.create_dataframe(strat_test_set).write.mode("overwrite").save_as_table("HOUSING_TEST")

        #3. create a pipeline for numerical and categorical features    
        data = strat_train_set.drop(['MEDIAN_HOUSE_VALUE', 'INCOME_CAT'], axis=1)
        data_labels = strat_train_set["MEDIAN_HOUSE_VALUE"].copy()
        housing_test = strat_test_set.drop("MEDIAN_HOUSE_VALUE", axis=1)
        housing_test_labels = strat_test_set["MEDIAN_HOUSE_VALUE"].copy()

        
        # numerical features
        housing_num = data.drop("OCEAN_PROXIMITY", axis=1)
        # create a pipeline for numerical features
        num_pipeline = Pipeline([
        ('std_scaler', StandardScaler()),
        ])
        data_num_tr = num_pipeline.fit_transform(housing_num)

        num_attribs = list(housing_num)
        cat_attribs = ["OCEAN_PROXIMITY"]
        
        preprocessor = ColumnTransformer([
        ("num", num_pipeline, num_attribs),
        ("cat", OneHotEncoder(), cat_attribs),
        ])
        #data_prepared = preprocessor.fit_transform(data)
        full_pipeline = Pipeline([
                ('preprocessor', preprocessor),
                ('model', models_select[algorithm]),
            ])  
        
        #4 predict on the test set and return the root mean squared error (RMSE)
        
        # Good practice of repeated k-fold cross-validation with 3 repeats and 10 folds in this case we select 5 for faster execution.
        cv = RepeatedKFold(n_splits=5, n_repeats=3, random_state=1)
        random_search = RandomizedSearchCV(full_pipeline, param_distributions=params_distr,
                                    n_iter=50, cv=cv, scoring='neg_mean_squared_error',
                                    verbose=1, random_state=42)

        random_search.fit(data, data_labels)
         #5 save the model
        save_file(session, random_search, "@MODELS/housing_reg.joblib") 
        negative_mse = random_search.best_score_
        rmse = np.sqrt(-negative_mse)
        return rmse
				
			
				
					params_distr = {'model__n_estimators': [int(x) for x in np.linspace(start = 1, stop = 20, num = 20)],
    'model__max_features': ['auto', 'sqrt'],
    'model__max_depth': [int(x) for x in np.linspace(10, 120, num = 12)],
    'model__min_samples_split': [2, 6, 10],
    'model__min_samples_leaf':[1, 3, 4],
    'model__bootstrap': [True, False]}

table="Data_no_nulls"
				
			
				
					# Create an instance of StoredProcedure using the sproc() function
train_model_sp = sproc(train_model, replace=True, session=session)

				
			
				
					train_model_sp(table,params_distr, "forest", session=session)
				
			

59098.024468258394

Random Forest gets us a prediction error of $59098. Although this isn’t a satisfactory result, we’re going to leave it there. When working with any data set, please check different algorithms and features to see which one will give you best results. The function built previously can be expanded to include other algorithms besides Random Forests.

Note: By the time this article was written, executing XGBoost would eventually exhaust warehouse memory, but with the recent release of Snowpark in GA, Snowflake also announced Snowpark-Optimized Warehouses. These are recommended for workloads that have large memory requirements such as ML training. You will see XGBoost is already there, so just give a try and also add your own.

Let’s assume our model is trained and tuned and we’re satisfied with the accuracy. In this case, let’s proceed to deploy it to Snowflake using Python UDFs.
 
The Snowpark library uploads and executes UDFs on the server. If your UDF has a dependency or needs to read data from a file, you need to call Session.add_import(). For this case, we added the models from the model stage. The next step is to create a function that will reconstruct a Python object from a file persisted with joblib.dump, like we did on the save_file function.
				
					import sys
import cachetools
import os
session.add_import("@MODELS/housing_reg.joblib") 

@cachetools.cached(cache={})
def read_file(filename):
       import_dir = sys._xoptions.get("snowflake_import_directory")
       if import_dir:
              with open(os.path.join(import_dir, filename), 'rb') as file:
                     m = joblib.load(file)
                     return m
				
			

The @udf decorator will package the function as Snowflake Python UDF and save the model into Snowflake’s internal stage UDF. The function name will be “predict”. It will run the previous read_file function, load it into the variable called “m”, create a dataframe with locals variables, and run “predict” on the variable row.

				
					from snowflake.snowpark.functions import udf

features = ['LONGITUDE', 'LATITUDE', 'HOUSING_MEDIAN_AGE', 'TOTAL_ROOMS',
       'TOTAL_BEDROOMS', 'POPULATION', 'HOUSEHOLDS', 'MEDIAN_INCOME', 'OCEAN_PROXIMITY']

@udf(name='predict', is_permanent=True, stage_location='@udf', replace=True, session=session)
def predict(LONGITUDE: float, LATITUDE: float, HOUSING_MEDIAN_AGE: float, TOTAL_ROOMS: float, 
                    TOTAL_BEDROOMS: float, POPULATION: float, HOUSEHOLDS: float, MEDIAN_INCOME: float, 
                    OCEAN_PROXIMITY: str) -> float:
       m = read_file('housing_reg.joblib')       
       row = pd.DataFrame([locals()], columns=features)
       return m.predict(row)[0]
				
			

We load the Housing_Test set we created before in snowdf_test. If we want to predict “Median_house_Value”, we drop that one and the one we created earlier “Income_Cat”. Once that is done, we call our predict UDF.

				
					from snowflake.snowpark import functions as F

snowdf_test = session.table("HOUSING_TEST")
inputs = snowdf_test.drop("MEDIAN_HOUSE_VALUE", "INCOME_CAT")
snowdf_results = snowdf_test.select(
                    *inputs,
                    predict(*inputs).alias('PREDICTION'), 
                    (F.col('MEDIAN_HOUSE_VALUE')).alias('ACTUAL_LABEL')
                    ).limit(20)
				
			
				
					snowdf_results.to_pandas()
				
			
Snowpark for Python - Hakkoda - Image 6
Sample of snowdf_results output.

And finally, we place the Actual_Label versus the Prediction.

				
					snowdf_results.select(snowdf_results.PREDICTION,snowdf_results.ACTUAL_LABEL).to_pandas()
				
			
Snowpark for Python - Hakkoda - Image 7
Sample of Prediction versus Actual_Label output.

And this is the view we get placing a Housing_Detection table with our predictions in Snowflake.

Snowpark for Python - Hakkoda - Image 8

Too Long, Didn’t Read:

In this tutorial, you learned how to take an already available data science model and migrate it to Snowflake using Snowpark for Python.

Some of the points we covered here were:

  • How to connect to Snowflake
  • Loading data using Snowpark functions like copy_into
  • Connect to your data in Snowflake
  • Perform data science predictive modelling in our data with EDA, data cleaning, Cross-validation and fine tuning
  • Deploy the model in Snowflake with UDF python functions.

We hope you enjoyed the tutorial as much as we did creating it! As a closing thought, if you no longer need to use a session for executing queries and want to cancel any running queries, calling the Close method from the Session object is a good way to go.

				
					session.close()
				
			

How Hakkoda Can Help

Our highly-trained team of experts at Hakkoda can help you move to a modern data stack by using the latest functionalities and tools. We can put in place a data solution that suits your objectives, our knowledge and expertise takes care of the rest.

At Hakkoda, we’ll leverage data engineering, data science, and even data-driven full-stack application development. To start your data innovation journey with state-of-the-art data solutions, contact us today.

More information and references

FSI State of Data: Financial Services and Insurance Orgs are Falling Behind in Data Modernization. Here’s How They Can Catch Up.

FSI State of Data: Financial...

Hakkoda’s Financial Services and Insurance State of Data Report reveals key gaps in data strategy, barriers to data modernization, and…
Hakkōda 2024 Financial Services & Insurance State of Data Report: 97% Say Generative AI Matters for Success

Hakkōda 2024 Financial Services &...

Hakkoda releases its 2024 Financial Services & Insurance State of Data Report, surveying 145 director to CEO level data leaders…
Rainfall MDM

Rainfall MDM

The Sigma OCF Connector facilitates seamless integration between Alation's Data Catalog and Sigma, enabling comprehensive data discovery, governance, and search…

Never miss an update​

Join our mailing list to stay updated with everything Hakkoda.

Ready to learn more?

Speak with one of our experts.