exercise06-horovod(Python)
Loading...

Exercise 06 : Horovod Runner on Databricks Runtime for ML

In Spark, you can train not only statistical model (such like, linear regressor, decision tree classifier, etc), but also you can train neural networks with TensorFlow, PyTorch, and so on. Here I show you TensorFlow training example with distributed manners, using ML runtime on Databricks.
As you know, many of deep learning frameworks itself is having capabilities for distributed training without Apache Spark. (For instance, TensorFlow is natively having "Distributed TensorFlow".) However, it'll still be useful to run distributed training in Apache Spark, since you can integrate other operations with the same consistent distributed manners, such as, data preparation, data transformation, other machine learning tasks, so on and so forth.

Before starting,

  1. Run Exercise 01 (Storage Settings) in order to mount /mnt/testblob.
  2. Copy pickle dataset for MNIST ("MNIST_train_rank0.pkl", "MNIST_train_rank1.pkl", and "MNIST_test.pkl") in this mounted blob container (/mnt/testblob).
  3. Make sure to create Databricks Runtime ML for cluster and attach in this notebook. (You cannot run this exercise in the standard Databricks runtime without "ML".)

ML runtime is optimized for deep learning, and all related components (TensorFlow, Horovod, Keras, XGBoost, etc) are already built-in. (You don’t need to install these components by yourself.)
Built-in HorovodRunner on ML runtime helps Horovod to run on Apache Spark. (Horovod (by Uber) has efficient parameter sharing mechanism and beneficial for scaling.)

back to index

Make directory to save the trained model.

dbutils.fs.mkdirs("/mnt/testblob/horovod_trained_model")
Out[2]: True

Prepare for the training function, in which we train the model to predict hand-writing digits images.

def train_fn(checkpoint_path, learning_rate=0.01):
  from tensorflow.keras import backend as K
  import tensorflow as tf
  from tensorflow import keras
  from tensorflow.keras import models
  from tensorflow.keras import layers
  import horovod.tensorflow.keras as hvd
  import numpy as np
  import pandas as pd
 
  hvd.init()
 
  # use different data in each workers
  train_dat = pd.read_pickle("/dbfs/mnt/testblob/MNIST_train_rank%d.pkl" % hvd.rank())
  x_train = train_dat["image"]
  x_train = np.stack(x_train.values, axis=0)
  y_train = train_dat["label_col"]
  y_train = keras.utils.to_categorical(y_train, 10)
 
  # generate model
  model = models.Sequential([
    layers.Dense(784, activation='relu', input_shape=(784,)),
    layers.Dense(128, activation='relu'),
    layers.Dense(64, activation='relu'),
    layers.Dense(10, activation='softmax'),
  ])
 
  # prepare optimizer and compile to TensorFlow model with Keras
  optimizer = keras.optimizers.SGD(
    learning_rate=learning_rate * hvd.size(),
    momentum=0.9)
  optimizer = hvd.DistributedOptimizer(optimizer)
  model.compile(
    optimizer=optimizer,
    loss='categorical_crossentropy',
    metrics=['accuracy'])
 
  # save checkpoint only on rank 0 to prevent conflicts
  callbacks = [hvd.callbacks.BroadcastGlobalVariablesCallback(0)]
  if hvd.rank() == 0:
    callbacks.append(keras.callbacks.ModelCheckpoint(
      checkpoint_path,
      save_weights_only=True))
 
  # train
  model.fit(
    x_train,
    y_train,
    batch_size=64,
    callbacks=callbacks,
    epochs=2,
    verbose=2)

Run the training with HorovodRunner. Above function will be run on distributed workers (executors).

from sparkdl import HorovodRunner
 
# run only 2 workers (rank0 and rank1)
hr = HorovodRunner(np=2)
hr.run(
  main=train_fn,
  checkpoint_path="/dbfs/mnt/testblob/horovod_trained_model/checkpoint.ckpt",
  learning_rate=0.01)
/databricks/python/lib/python3.7/site-packages/botocore/vendored/requests/packages/urllib3/_collections.py:1: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working from collections import Mapping, MutableMapping Using TensorFlow backend. The global names read or written to by the pickled function are set(). The pickled object size is 2355 bytes. ### How to enable Horovod Timeline? ### HorovodRunner has the ability to record the timeline of its activity with Horovod Timeline. To record a Horovod Timeline, set the `HOROVOD_TIMELINE` environment variable to the location of the timeline file to be created. You can then open the timeline file using the chrome://tracing facility of the Chrome browser. Start training. Warning: Permanently added '10.139.64.5' (ECDSA) to the list of known hosts. [1,0]<stderr>:Using TensorFlow backend. [1,1]<stderr>:Using TensorFlow backend. [1,1]<stderr>:WARNING:tensorflow:From /databricks/python/lib/python3.7/site-packages/horovod/tensorflow/__init__.py:152: The name tf.global_variables is deprecated. Please use tf.compat.v1.global_variables instead. [1,1]<stderr>: [1,1]<stderr>:From /databricks/python/lib/python3.7/site-packages/horovod/tensorflow/__init__.py:152: The name tf.global_variables is deprecated. Please use tf.compat.v1.global_variables instead. [1,1]<stderr>: [1,1]<stderr>:WARNING:tensorflow:From /databricks/python/lib/python3.7/site-packages/horovod/tensorflow/__init__.py:178: The name tf.get_default_graph is deprecated. Please use tf.compat.v1.get_default_graph instead. [1,1]<stderr>: [1,1]<stderr>:From /databricks/python/lib/python3.7/site-packages/horovod/tensorflow/__init__.py:178: The name tf.get_default_graph is deprecated. Please use tf.compat.v1.get_default_graph instead. [1,1]<stderr>: [1,0]<stderr>:WARNING:tensorflow:From /databricks/python/lib/python3.7/site-packages/horovod/tensorflow/__init__.py:152: The name tf.global_variables is deprecated. Please use tf.compat.v1.global_variables instead. [1,0]<stderr>: [1,0]<stderr>:From /databricks/python/lib/python3.7/site-packages/horovod/tensorflow/__init__.py:152: The name tf.global_variables is deprecated. Please use tf.compat.v1.global_variables instead. [1,0]<stderr>: [1,0]<stderr>:WARNING:tensorflow:From /databricks/python/lib/python3.7/site-packages/horovod/tensorflow/__init__.py:178: The name tf.get_default_graph is deprecated. Please use tf.compat.v1.get_default_graph instead. [1,0]<stderr>: [1,0]<stderr>:From /databricks/python/lib/python3.7/site-packages/horovod/tensorflow/__init__.py:178: The name tf.get_default_graph is deprecated. Please use tf.compat.v1.get_default_graph instead. [1,0]<stderr>: [1,1]<stderr>:WARNING:tensorflow:From /databricks/python/lib/python3.7/site-packages/tensorflow_core/python/ops/resource_variable_ops.py:1630: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version. [1,1]<stderr>:Instructions for updating: [1,1]<stderr>:If using Keras pass *_constraint arguments to layers. [1,1]<stderr>:From /databricks/python/lib/python3.7/site-packages/tensorflow_core/python/ops/resource_variable_ops.py:1630: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version. [1,1]<stderr>:Instructions for updating: [1,1]<stderr>:If using Keras pass *_constraint arguments to layers. [1,1]<stdout>:Train on 27500 samples [1,1]<stderr>:2021-01-29 20:51:43.579201: I tensorflow/core/platform/cpu_feature_guard.cc:145] This TensorFlow binary is optimized with Intel(R) MKL-DNN to use the following CPU instructions in performance critical operations: SSE4.1 SSE4.2 AVX AVX2 FMA [1,1]<stderr>:To enable them in non-MKL-DNN operations, rebuild TensorFlow with the appropriate compiler flags. [1,1]<stderr>:2021-01-29 20:51:43.590057: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2294685000 Hz [1,1]<stderr>:2021-01-29 20:51:43.590420: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x55c7852a44b0 initialized for platform Host (this does not guarantee that XLA will be used). Devices: [1,1]<stderr>:2021-01-29 20:51:43.590461: I tensorflow/compiler/xla/service/service.cc:176] StreamExecutor device (0): Host, Default Version [1,1]<stderr>:OMP: Info #212: KMP_AFFINITY: decoding x2APIC ids. [1,1]<stderr>:OMP: Info #210: KMP_AFFINITY: Affinity capable, using global cpuid leaf 11 info [1,1]<stderr>:OMP: Info #154: KMP_AFFINITY: Initial OS proc set respected: 0-3 [1,1]<stderr>:OMP: Info #156: KMP_AFFINITY: 4 available OS procs [1,1]<stderr>:OMP: Info #157: KMP_AFFINITY: Uniform topology [1,1]<stderr>:OMP: Info #179: KMP_AFFINITY: 1 packages x 4 cores/pkg x 1 threads/core (4 total cores) [1,1]<stderr>:OMP: Info #214: KMP_AFFINITY: OS proc to physical thread map: [1,1]<stderr>:OMP: Info #171: KMP_AFFINITY: OS proc 0 maps to package 0 core 0 [1,1]<stderr>:OMP: Info #171: KMP_AFFINITY: OS proc 1 maps to package 0 core 1 [1,1]<stderr>:OMP: Info #171: KMP_AFFINITY: OS proc 2 maps to package 0 core 2 [1,1]<stderr>:OMP: Info #171: KMP_AFFINITY: OS proc 3 maps to package 0 core 3 [1,1]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2226 tid 2226 thread 0 bound to OS proc set 0 [1,1]<stderr>:2021-01-29 20:51:43.592346: I tensorflow/core/common_runtime/process_util.cc:115] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance. [1,0]<stderr>:WARNING:tensorflow:From /databricks/python/lib/python3.7/site-packages/tensorflow_core/python/ops/resource_variable_ops.py:1630: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version. [1,0]<stderr>:Instructions for updating: [1,0]<stderr>:If using Keras pass *_constraint arguments to layers. [1,0]<stderr>:From /databricks/python/lib/python3.7/site-packages/tensorflow_core/python/ops/resource_variable_ops.py:1630: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version. [1,0]<stderr>:Instructions for updating: [1,0]<stderr>:If using Keras pass *_constraint arguments to layers. [1,1]<stdout>:Epoch 1/2 [1,0]<stdout>:Train on 27500 samples [1,0]<stderr>:2021-01-29 20:51:43.872546: I tensorflow/core/platform/cpu_feature_guard.cc:145] This TensorFlow binary is optimized with Intel(R) MKL-DNN to use the following CPU instructions in performance critical operations: SSE4.1 SSE4.2 AVX AVX2 FMA [1,0]<stderr>:To enable them in non-MKL-DNN operations, rebuild TensorFlow with the appropriate compiler flags. [1,0]<stderr>:2021-01-29 20:51:43.879516: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2294685000 Hz [1,0]<stderr>:2021-01-29 20:51:43.879945: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x55face4bd640 initialized for platform Host (this does not guarantee that XLA will be used). Devices: [1,0]<stderr>:2021-01-29 20:51:43.879981: I tensorflow/compiler/xla/service/service.cc:176] StreamExecutor device (0): Host, Default Version [1,0]<stderr>:OMP: Info #212: KMP_AFFINITY: decoding x2APIC ids. [1,0]<stderr>:OMP: Info #210: KMP_AFFINITY: Affinity capable, using global cpuid leaf 11 info [1,0]<stderr>:OMP: Info #154: KMP_AFFINITY: Initial OS proc set respected: 0-3 [1,0]<stderr>:OMP: Info #156: KMP_AFFINITY: 4 available OS procs [1,0]<stderr>:OMP: Info #157: KMP_AFFINITY: Uniform topology [1,0]<stderr>:OMP: Info #179: KMP_AFFINITY: 1 packages x 4 cores/pkg x 1 threads/core (4 total cores) [1,0]<stderr>:OMP: Info #214: KMP_AFFINITY: OS proc to physical thread map: [1,0]<stderr>:OMP: Info #171: KMP_AFFINITY: OS proc 0 maps to package 0 core 0 [1,0]<stderr>:OMP: Info #171: KMP_AFFINITY: OS proc 1 maps to package 0 core 1 [1,0]<stderr>:OMP: Info #171: KMP_AFFINITY: OS proc 2 maps to package 0 core 2 [1,0]<stderr>:OMP: Info #171: KMP_AFFINITY: OS proc 3 maps to package 0 core 3 [1,0]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2185 tid 2185 thread 0 bound to OS proc set 0 [1,0]<stderr>:2021-01-29 20:51:43.882761: I tensorflow/core/common_runtime/process_util.cc:115] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance. [1,1]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2226 tid 2246 thread 1 bound to OS proc set 1 [1,1]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2226 tid 2249 thread 2 bound to OS proc set 2 [1,1]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2226 tid 2250 thread 3 bound to OS proc set 3 [1,1]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2226 tid 2251 thread 4 bound to OS proc set 0 [1,1]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2226 tid 2247 thread 5 bound to OS proc set 1 [1,1]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2226 tid 2253 thread 7 bound to OS proc set 3 [1,1]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2226 tid 2252 thread 6 bound to OS proc set 2 [1,1]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2226 tid 2254 thread 8 bound to OS proc set 0 [1,0]<stdout>:Epoch 1/2 [1,0]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2185 tid 2205 thread 1 bound to OS proc set 1 [1,0]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2185 tid 2208 thread 2 bound to OS proc set 2 [1,0]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2185 tid 2209 thread 3 bound to OS proc set 3 [1,0]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2185 tid 2210 thread 4 bound to OS proc set 0 [1,0]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2185 tid 2206 thread 5 bound to OS proc set 1 [1,0]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2185 tid 2212 thread 7 bound to OS proc set 3 [1,0]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2185 tid 2213 thread 8 bound to OS proc set 0 [1,0]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2185 tid 2211 thread 6 bound to OS proc set 2 [1,1]<stdout>:27500/27500 - 19s - loss: 0.3688 - acc: 0.8921 [1,1]<stdout>:Epoch 2/2 [1,0]<stdout>:27500/27500 - 20s - loss: 0.3821 - acc: 0.8876 [1,0]<stdout>:Epoch 2/2 [1,1]<stdout>:27500/27500 - 19s - loss: 0.1212 - acc: 0.9644 [1,0]<stdout>:27500/27500 - 19s - loss: 0.1276 - acc: 0.9616

Using above trained model, predict and compare the results with actual label.

# Check result (model) using test data
from tensorflow.keras import backend as K
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import models
from tensorflow.keras import layers
import numpy as np
import pandas as pd
 
# load test data
test_dat = pd.read_pickle("/dbfs/mnt/testblob/MNIST_test.pkl")
x_test = test_dat["image"]
x_test = np.stack(x_test.values, axis=0)
y_test_label = test_dat["label_col"]
 
# load trained model
model = models.Sequential([
  layers.Dense(784, activation='relu', input_shape=(784,)),
  layers.Dense(128, activation='relu'),
  layers.Dense(64, activation='relu'),
  layers.Dense(10, activation='softmax'),
])
model.compile(
  optimizer=keras.optimizers.SGD(learning_rate=0.01, momentum=0.9),
  loss='categorical_crossentropy',
  metrics=['accuracy'])
model.load_weights(
  "/dbfs/mnt/testblob/horovod_trained_model/checkpoint.ckpt")
 
# predict and list results
results = model.predict(x_test)
pred_array = np.column_stack((y_test_label.values, np.argmax(results, axis=1)))
pred_df = pd.DataFrame(
  data = pred_array,
  columns = ["Actual", "Predicted"])
pred_df
WARNING:tensorflow:From /databricks/python/lib/python3.7/site-packages/tensorflow_core/python/ops/resource_variable_ops.py:1630: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version. Instructions for updating: If using Keras pass *_constraint arguments to layers. Out[4]:

Remove the trained model. (Clean-up)