def train_fn(checkpoint_path, learning_rate=0.01):
from tensorflow.keras import backend as K
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import models
from tensorflow.keras import layers
import horovod.tensorflow.keras as hvd
import numpy as np
import pandas as pd
hvd.init()
# use different data in each workers
train_dat = pd.read_pickle("/dbfs/mnt/testblob/MNIST_train_rank%d.pkl" % hvd.rank())
x_train = train_dat["image"]
x_train = np.stack(x_train.values, axis=0)
y_train = train_dat["label_col"]
y_train = keras.utils.to_categorical(y_train, 10)
# generate model
model = models.Sequential([
layers.Dense(784, activation='relu', input_shape=(784,)),
layers.Dense(128, activation='relu'),
layers.Dense(64, activation='relu'),
layers.Dense(10, activation='softmax'),
])
# prepare optimizer and compile to TensorFlow model with Keras
optimizer = keras.optimizers.SGD(
learning_rate=learning_rate * hvd.size(),
momentum=0.9)
optimizer = hvd.DistributedOptimizer(optimizer)
model.compile(
optimizer=optimizer,
loss='categorical_crossentropy',
metrics=['accuracy'])
# save checkpoint only on rank 0 to prevent conflicts
callbacks = [hvd.callbacks.BroadcastGlobalVariablesCallback(0)]
if hvd.rank() == 0:
callbacks.append(keras.callbacks.ModelCheckpoint(
checkpoint_path,
save_weights_only=True))
# train
model.fit(
x_train,
y_train,
batch_size=64,
callbacks=callbacks,
epochs=2,
verbose=2)
from sparkdl import HorovodRunner
# run only 2 workers (rank0 and rank1)
hr = HorovodRunner(np=2)
hr.run(
main=train_fn,
checkpoint_path="/dbfs/mnt/testblob/horovod_trained_model/checkpoint.ckpt",
learning_rate=0.01)
/databricks/python/lib/python3.7/site-packages/botocore/vendored/requests/packages/urllib3/_collections.py:1: DeprecationWarning: Using or importing the ABCs from 'collections' instead of from 'collections.abc' is deprecated, and in 3.8 it will stop working
from collections import Mapping, MutableMapping
Using TensorFlow backend.
The global names read or written to by the pickled function are set().
The pickled object size is 2355 bytes.
### How to enable Horovod Timeline? ###
HorovodRunner has the ability to record the timeline of its activity with Horovod Timeline. To
record a Horovod Timeline, set the `HOROVOD_TIMELINE` environment variable to the location of the
timeline file to be created. You can then open the timeline file using the chrome://tracing
facility of the Chrome browser.
Start training.
Warning: Permanently added '10.139.64.5' (ECDSA) to the list of known hosts.
[1,0]<stderr>:Using TensorFlow backend.
[1,1]<stderr>:Using TensorFlow backend.
[1,1]<stderr>:WARNING:tensorflow:From /databricks/python/lib/python3.7/site-packages/horovod/tensorflow/__init__.py:152: The name tf.global_variables is deprecated. Please use tf.compat.v1.global_variables instead.
[1,1]<stderr>:
[1,1]<stderr>:From /databricks/python/lib/python3.7/site-packages/horovod/tensorflow/__init__.py:152: The name tf.global_variables is deprecated. Please use tf.compat.v1.global_variables instead.
[1,1]<stderr>:
[1,1]<stderr>:WARNING:tensorflow:From /databricks/python/lib/python3.7/site-packages/horovod/tensorflow/__init__.py:178: The name tf.get_default_graph is deprecated. Please use tf.compat.v1.get_default_graph instead.
[1,1]<stderr>:
[1,1]<stderr>:From /databricks/python/lib/python3.7/site-packages/horovod/tensorflow/__init__.py:178: The name tf.get_default_graph is deprecated. Please use tf.compat.v1.get_default_graph instead.
[1,1]<stderr>:
[1,0]<stderr>:WARNING:tensorflow:From /databricks/python/lib/python3.7/site-packages/horovod/tensorflow/__init__.py:152: The name tf.global_variables is deprecated. Please use tf.compat.v1.global_variables instead.
[1,0]<stderr>:
[1,0]<stderr>:From /databricks/python/lib/python3.7/site-packages/horovod/tensorflow/__init__.py:152: The name tf.global_variables is deprecated. Please use tf.compat.v1.global_variables instead.
[1,0]<stderr>:
[1,0]<stderr>:WARNING:tensorflow:From /databricks/python/lib/python3.7/site-packages/horovod/tensorflow/__init__.py:178: The name tf.get_default_graph is deprecated. Please use tf.compat.v1.get_default_graph instead.
[1,0]<stderr>:
[1,0]<stderr>:From /databricks/python/lib/python3.7/site-packages/horovod/tensorflow/__init__.py:178: The name tf.get_default_graph is deprecated. Please use tf.compat.v1.get_default_graph instead.
[1,0]<stderr>:
[1,1]<stderr>:WARNING:tensorflow:From /databricks/python/lib/python3.7/site-packages/tensorflow_core/python/ops/resource_variable_ops.py:1630: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version.
[1,1]<stderr>:Instructions for updating:
[1,1]<stderr>:If using Keras pass *_constraint arguments to layers.
[1,1]<stderr>:From /databricks/python/lib/python3.7/site-packages/tensorflow_core/python/ops/resource_variable_ops.py:1630: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version.
[1,1]<stderr>:Instructions for updating:
[1,1]<stderr>:If using Keras pass *_constraint arguments to layers.
[1,1]<stdout>:Train on 27500 samples
[1,1]<stderr>:2021-01-29 20:51:43.579201: I tensorflow/core/platform/cpu_feature_guard.cc:145] This TensorFlow binary is optimized with Intel(R) MKL-DNN to use the following CPU instructions in performance critical operations: SSE4.1 SSE4.2 AVX AVX2 FMA
[1,1]<stderr>:To enable them in non-MKL-DNN operations, rebuild TensorFlow with the appropriate compiler flags.
[1,1]<stderr>:2021-01-29 20:51:43.590057: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2294685000 Hz
[1,1]<stderr>:2021-01-29 20:51:43.590420: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x55c7852a44b0 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
[1,1]<stderr>:2021-01-29 20:51:43.590461: I tensorflow/compiler/xla/service/service.cc:176] StreamExecutor device (0): Host, Default Version
[1,1]<stderr>:OMP: Info #212: KMP_AFFINITY: decoding x2APIC ids.
[1,1]<stderr>:OMP: Info #210: KMP_AFFINITY: Affinity capable, using global cpuid leaf 11 info
[1,1]<stderr>:OMP: Info #154: KMP_AFFINITY: Initial OS proc set respected: 0-3
[1,1]<stderr>:OMP: Info #156: KMP_AFFINITY: 4 available OS procs
[1,1]<stderr>:OMP: Info #157: KMP_AFFINITY: Uniform topology
[1,1]<stderr>:OMP: Info #179: KMP_AFFINITY: 1 packages x 4 cores/pkg x 1 threads/core (4 total cores)
[1,1]<stderr>:OMP: Info #214: KMP_AFFINITY: OS proc to physical thread map:
[1,1]<stderr>:OMP: Info #171: KMP_AFFINITY: OS proc 0 maps to package 0 core 0
[1,1]<stderr>:OMP: Info #171: KMP_AFFINITY: OS proc 1 maps to package 0 core 1
[1,1]<stderr>:OMP: Info #171: KMP_AFFINITY: OS proc 2 maps to package 0 core 2
[1,1]<stderr>:OMP: Info #171: KMP_AFFINITY: OS proc 3 maps to package 0 core 3
[1,1]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2226 tid 2226 thread 0 bound to OS proc set 0
[1,1]<stderr>:2021-01-29 20:51:43.592346: I tensorflow/core/common_runtime/process_util.cc:115] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.
[1,0]<stderr>:WARNING:tensorflow:From /databricks/python/lib/python3.7/site-packages/tensorflow_core/python/ops/resource_variable_ops.py:1630: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version.
[1,0]<stderr>:Instructions for updating:
[1,0]<stderr>:If using Keras pass *_constraint arguments to layers.
[1,0]<stderr>:From /databricks/python/lib/python3.7/site-packages/tensorflow_core/python/ops/resource_variable_ops.py:1630: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version.
[1,0]<stderr>:Instructions for updating:
[1,0]<stderr>:If using Keras pass *_constraint arguments to layers.
[1,1]<stdout>:Epoch 1/2
[1,0]<stdout>:Train on 27500 samples
[1,0]<stderr>:2021-01-29 20:51:43.872546: I tensorflow/core/platform/cpu_feature_guard.cc:145] This TensorFlow binary is optimized with Intel(R) MKL-DNN to use the following CPU instructions in performance critical operations: SSE4.1 SSE4.2 AVX AVX2 FMA
[1,0]<stderr>:To enable them in non-MKL-DNN operations, rebuild TensorFlow with the appropriate compiler flags.
[1,0]<stderr>:2021-01-29 20:51:43.879516: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 2294685000 Hz
[1,0]<stderr>:2021-01-29 20:51:43.879945: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x55face4bd640 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
[1,0]<stderr>:2021-01-29 20:51:43.879981: I tensorflow/compiler/xla/service/service.cc:176] StreamExecutor device (0): Host, Default Version
[1,0]<stderr>:OMP: Info #212: KMP_AFFINITY: decoding x2APIC ids.
[1,0]<stderr>:OMP: Info #210: KMP_AFFINITY: Affinity capable, using global cpuid leaf 11 info
[1,0]<stderr>:OMP: Info #154: KMP_AFFINITY: Initial OS proc set respected: 0-3
[1,0]<stderr>:OMP: Info #156: KMP_AFFINITY: 4 available OS procs
[1,0]<stderr>:OMP: Info #157: KMP_AFFINITY: Uniform topology
[1,0]<stderr>:OMP: Info #179: KMP_AFFINITY: 1 packages x 4 cores/pkg x 1 threads/core (4 total cores)
[1,0]<stderr>:OMP: Info #214: KMP_AFFINITY: OS proc to physical thread map:
[1,0]<stderr>:OMP: Info #171: KMP_AFFINITY: OS proc 0 maps to package 0 core 0
[1,0]<stderr>:OMP: Info #171: KMP_AFFINITY: OS proc 1 maps to package 0 core 1
[1,0]<stderr>:OMP: Info #171: KMP_AFFINITY: OS proc 2 maps to package 0 core 2
[1,0]<stderr>:OMP: Info #171: KMP_AFFINITY: OS proc 3 maps to package 0 core 3
[1,0]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2185 tid 2185 thread 0 bound to OS proc set 0
[1,0]<stderr>:2021-01-29 20:51:43.882761: I tensorflow/core/common_runtime/process_util.cc:115] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.
[1,1]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2226 tid 2246 thread 1 bound to OS proc set 1
[1,1]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2226 tid 2249 thread 2 bound to OS proc set 2
[1,1]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2226 tid 2250 thread 3 bound to OS proc set 3
[1,1]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2226 tid 2251 thread 4 bound to OS proc set 0
[1,1]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2226 tid 2247 thread 5 bound to OS proc set 1
[1,1]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2226 tid 2253 thread 7 bound to OS proc set 3
[1,1]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2226 tid 2252 thread 6 bound to OS proc set 2
[1,1]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2226 tid 2254 thread 8 bound to OS proc set 0
[1,0]<stdout>:Epoch 1/2
[1,0]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2185 tid 2205 thread 1 bound to OS proc set 1
[1,0]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2185 tid 2208 thread 2 bound to OS proc set 2
[1,0]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2185 tid 2209 thread 3 bound to OS proc set 3
[1,0]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2185 tid 2210 thread 4 bound to OS proc set 0
[1,0]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2185 tid 2206 thread 5 bound to OS proc set 1
[1,0]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2185 tid 2212 thread 7 bound to OS proc set 3
[1,0]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2185 tid 2213 thread 8 bound to OS proc set 0
[1,0]<stderr>:OMP: Info #250: KMP_AFFINITY: pid 2185 tid 2211 thread 6 bound to OS proc set 2
[1,1]<stdout>:27500/27500 - 19s - loss: 0.3688 - acc: 0.8921
[1,1]<stdout>:Epoch 2/2
[1,0]<stdout>:27500/27500 - 20s - loss: 0.3821 - acc: 0.8876
[1,0]<stdout>:Epoch 2/2
[1,1]<stdout>:27500/27500 - 19s - loss: 0.1212 - acc: 0.9644
[1,0]<stdout>:27500/27500 - 19s - loss: 0.1276 - acc: 0.9616
# Check result (model) using test data
from tensorflow.keras import backend as K
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import models
from tensorflow.keras import layers
import numpy as np
import pandas as pd
# load test data
test_dat = pd.read_pickle("/dbfs/mnt/testblob/MNIST_test.pkl")
x_test = test_dat["image"]
x_test = np.stack(x_test.values, axis=0)
y_test_label = test_dat["label_col"]
# load trained model
model = models.Sequential([
layers.Dense(784, activation='relu', input_shape=(784,)),
layers.Dense(128, activation='relu'),
layers.Dense(64, activation='relu'),
layers.Dense(10, activation='softmax'),
])
model.compile(
optimizer=keras.optimizers.SGD(learning_rate=0.01, momentum=0.9),
loss='categorical_crossentropy',
metrics=['accuracy'])
model.load_weights(
"/dbfs/mnt/testblob/horovod_trained_model/checkpoint.ckpt")
# predict and list results
results = model.predict(x_test)
pred_array = np.column_stack((y_test_label.values, np.argmax(results, axis=1)))
pred_df = pd.DataFrame(
data = pred_array,
columns = ["Actual", "Predicted"])
pred_df
WARNING:tensorflow:From /databricks/python/lib/python3.7/site-packages/tensorflow_core/python/ops/resource_variable_ops.py:1630: calling BaseResourceVariable.__init__ (from tensorflow.python.ops.resource_variable_ops) with constraint is deprecated and will be removed in a future version.
Instructions for updating:
If using Keras pass *_constraint arguments to layers.
Out[4]:
Exercise 06 : Horovod Runner on Databricks Runtime for ML
In Spark, you can train not only statistical model (such like, linear regressor, decision tree classifier, etc), but also you can train neural networks with TensorFlow, PyTorch, and so on. Here I show you TensorFlow training example with distributed manners, using ML runtime on Databricks.
As you know, many of deep learning frameworks itself is having capabilities for distributed training without Apache Spark. (For instance, TensorFlow is natively having "Distributed TensorFlow".) However, it'll still be useful to run distributed training in Apache Spark, since you can integrate other operations with the same consistent distributed manners, such as, data preparation, data transformation, other machine learning tasks, so on and so forth.
Before starting,
/mnt/testblob
./mnt/testblob
).ML runtime is optimized for deep learning, and all related components (TensorFlow, Horovod, Keras, XGBoost, etc) are already built-in. (You don’t need to install these components by yourself.)
Built-in
HorovodRunner
on ML runtime helps Horovod to run on Apache Spark. (Horovod (by Uber) has efficient parameter sharing mechanism and beneficial for scaling.)back to index
Last refresh: Never