Let us look at three typical ways to build models on Databricks.
- Run machine learning jobs on the driver node
- Scale out with MLlib
- Use the databricks Runtime ML
In this blog post I will use Python and run the code in Databricks’ own notebooks (which are like Jupyter notebooks). Databricks also supports SQL, Scala, and R. One great feature is that the notebooks can be multi language.
Run machine learning jobs on a single node
A Databricks cluster has one driver node and one or more worker nodes. The Databricks runtime includes common used Python libraries, such as scikit-learn. However, they do not distribute their algorithms.
Running a ML job only on the driver might not be what we are looking for. It is not distributed and we could as well run it on our computer or in a Data Science Virtual Machine. However, some machine learning tasks can still take advantage of distributed computation and it a good way to take an existing single-node workflow and transition it to a distributed workflow.
This great example notebooks that uses scikit-learn shows how this is done.
Scalable Machine Learning with MLlib
For distributed machine learning workflows we can use Spark’s machine learning library MLlib. The library includes the most common algorithms for regression, classification, clustering, and collaborative filtering. It also has algorithms for working with features.
A pipeline is built of Transformers and Estimators. Transformers take a
DataFrame and converts it into another. Feature transformation and scoring a learned model are Transformers. Estimators are algorithms that can be fit on a
DataFrame to produce a Transformer. For detailed information see the pipeline components in the MLlib documentation.
Let us look at one of the example notebooks that uses ML Pipelines predict bike rental counts per hour and how we can build a pipeline.
The pipeline looks like this:
Source: GBT notebook
First, we need to assemble the features into a feature vector using the
VectorAssembler Transformer. Thereafter a
VectorIndexer Transformer identify which columns should be treated as categorical and index them. MLlib has both Transformers built in.
from pyspark.ml.feature import VectorAssembler, VectorIndexer featuresCols = df.columns featuresCols.remove('cnt') # This concatenates all feature columns into a single feature # vector in a new column "rawFeatures". vectorAssembler = VectorAssembler(inputCols=featuresCols , outputCol="rawFeatures") # This identifies categorical features and indexes them. vectorIndexer = VectorIndexer(inputCol="rawFeatures" , outputCol="features", maxCategories=4)
For training the model we use the Gradient-Boosted Trees (GBT) algorithm.
from pyspark.ml.regression import GBTRegressor # Takes the "features" column and learns to predict "cnt" gbt = GBTRegressor(labelCol="cnt")
The next step is use Spark’s cross validation framework and use it for hyper-parameter tuning. We wrap the model training stage within a
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder from pyspark.ml.evaluation import RegressionEvaluator # Define a grid of hyper-parameters to test: # - maxDepth: max depth of each decision tree in the GBT # ensemble # - maxIter: iterations, i.e., number of trees in each GBT # ensemble paramGrid = ParamGridBuilder()\ .addGrid(gbt.maxDepth, [2, 5])\ .addGrid(gbt.maxIter, [10, 100])\ .build() # We define an evaluation metric. This tells CrossValidator # how well we are doing by comparing the true labels with # predictions. evaluator = RegressionEvaluator(metricName="rmse" , labelCol=gbt.getLabelCol() , predictionCol=gbt.getPredictionCol()) # Declare the CrossValidator, which runs model tuning for us. cv = CrossValidator(estimator=gbt, evaluator=evaluator , estimatorParamMaps=paramGrid)
We can now assemble the pipeline. The
pipeline is an
from pyspark.ml import Pipeline pipeline = Pipeline(stages=[vectorAssembler, vectorIndexer, cv])
The last step is to train the model by calling
fit() on the
pipeline with our
train data set. After running
fit() the pipeline produces
pipelineModel which is a
pipelineModel = pipeline.fit(train)
We can now score
pipelineModel and make predictions on our
test data set and compute RMSE to tell us how well our model makes predictions.
predictions = pipelineModel.transform(test) rmse = evaluator.evaluate(predictions) print "RMSE on our test set: %g" % rmse
The full notebook shows the details on how this pipeline was built.
Databricks Runtime ML
The latest Databricks Runtime 4.3 has a number of Python, R, Java, and Scala libraries installed. Another runtime is the Databricks Runtime ML which is built on the Databricks Runtime but also includes several libraries and frameworks useful for machine learning and deep learning, such as TensorFlow, Keras, and XGBoost. It also has support for distributed TensorFlow training using Horovod.
Databricks Runtime ML is in Beta.
Where to begin?
We have seen three ways of getting started with machine learning on Azure Databricks. For existing workflows, running them on the driver node is a good options. If you are building a new model, using MLlib is a better option as it can take advantage of the distributed compute Spark has. Databricks Runtime ML is still in Beta and the documentation is sparse, so unless you already are familiar with, for example, distributed TensorFlow training with Horovod it would not be the place to start.