Getting Started - Distributed Training

# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
#   Licensed under the Apache License, Version 2.0 (the "License").
#   You may not use this file except in compliance with the License.
#   A copy of the License is located at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   or in the "license" file accompanying this file. This file is distributed
#   on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
#   express or implied. See the License for the specific language governing
#   permissions and limitations under the License.
# ==============================================================================

Introduction

The increase of training Bayesian probabilistic models will results in increase in size and data consumption, which could not fit in a single processor. The training time of the model will increase significantly with the size of consumption. Hence, MXFusion implemented Horovod to carry out distributed training on Bayesian probabilistic models, which could significantly decrease consumption from GPUs and training times.

We provide an easy interface to perform distributed training in MXFusion.

[ ]:
import warnings
warnings.filterwarnings('ignore')

Simple Example from Getting Started

We can start with the same toy example from the Getting Started about estimating the mean and variance of a set of data. Again, we generate the same 100 data points with a given mean and variance following a normal distribution.

First of all, initialize Horovod with hvd.init(). We also want to set the global context to GPU or CPU depends where the code is executed.

[ ]:
import horovod.mxnet as hvd
import mxnet as mx
hvd.init()
mx.context.Context.default_ctx = mx.gpu(hvd.local_rank()) if mx.test_utils.list_gpus() else mx.cpu()

The following code below is the same data and model defined from Getting Started.

[ ]:
import numpy as np
np.random.seed(0)
mean_groundtruth = 3.
variance_groundtruth = 5.
N = 100
data = np.random.randn(N)*np.sqrt(variance_groundtruth) + mean_groundtruth
[ ]:
from mxfusion import Variable, Model
from mxfusion.components.variables import PositiveTransformation
from mxfusion.components.distributions import Normal
from mxfusion.common import config
config.DEFAULT_DTYPE = 'float64'

m = Model()
m.mu = Variable()
m.s = Variable(transformation=PositiveTransformation())
m.Y = Normal.define_variable(mean=m.mu, variance=m.s, shape=(N,))

To allow distributed training instead of single processor training, the inference class used would be DistributedGradBasedInference. Note that currently the code is not running distributed training in Horovod as we are still not running horovodrun command from our system.

[ ]:
from mxfusion.inference import DistributedGradBasedInference, MAP

infr = DistributedGradBasedInference(inference_algorithm=MAP(model=m, observed=[m.Y]))
infr.run(Y=mx.nd.array(data, dtype='float64'), learning_rate=0.1, max_iter=2000, verbose=True)

After optimization, the estimated parameters are stored in an instance of the class InferenceParameters, which can be access from an Inference instance by infr.params.

We collect the estimated mean and variance and compared with the generating parameters.

[ ]:
mean_estimated = infr.params[m.mu].asnumpy()
variance_estimated = infr.params[m.s].asnumpy()

print('The estimated mean and variance: %f, %f.' % (mean_estimated, variance_estimated))
print('The true mean and variance: %f, %f.' % (mean_groundtruth, variance_groundtruth))

Distributed Traning on Bayesian model

From the above example, we have done a maximum likelihood estimate from the observed data with distributed training. As our distributed training supports Bayesian model, now we can follow the second example of Getting Started, which uses Bayesian inference to estimate how much our estimated parameters differs from the true parameters.

[ ]:
m = Model()
m.mu = Normal.define_variable(mean=mx.nd.array([0], dtype='float64'),
                              variance=mx.nd.array([100], dtype='float64'), shape=(1,))
[ ]:
from mxfusion.components.functions import MXFusionGluonFunction

m.s_hat = Normal.define_variable(mean=mx.nd.array([5], dtype='float64'),
                                 variance=mx.nd.array([100], dtype='float64'),
                                 shape=(1,), dtype='float64')
trans_mxnet = mx.gluon.nn.HybridLambda(lambda F, x: F.Activation(x, act_type='softrelu'))
m.trans = MXFusionGluonFunction(trans_mxnet, num_outputs=1, broadcastable=True)
m.s = m.trans(m.s_hat)
m.Y = Normal.define_variable(mean=m.mu, variance=m.s, shape=(N,), dtype='float64')
[ ]:
from mxfusion.inference import create_Gaussian_meanfield

q = create_Gaussian_meanfield(model=m, observed=[m.Y])

To allow distributed training instead of single processor training, the inference class used would be DistributedGradBasedInference. The default grad_loop of DistributedGradBasedInference is DistributedBatchInferenceLoop, as opposed to GradBasedInference, which is BatchInferenceLoop.

Note that currently the code is not running distributed training in Horovod as we are still not running horovodrun or mpirun command from our system.

[ ]:
from mxfusion.inference import StochasticVariationalInference

infr = DistributedGradBasedInference(inference_algorithm=StochasticVariationalInference(
    model=m, posterior=q, num_samples=10, observed=[m.Y]))
infr.run(Y=mx.nd.array(data, dtype='float64'), learning_rate=0.1, verbose=True)

Let’s check the resulting posterior distribution.

[ ]:
mu_mean = infr.params[q.mu.factor.mean].asscalar()
mu_std = np.sqrt(infr.params[q.mu.factor.variance].asscalar())
s_hat_mean = infr.params[q.s_hat.factor.mean].asscalar()
s_hat_std = np.sqrt(infr.params[q.s_hat.factor.variance].asscalar())
s_15 = np.log1p(np.exp(s_hat_mean - s_hat_std))
s_50 = np.log1p(np.exp(s_hat_mean))
s_85 = np.log1p(np.exp(s_hat_mean + s_hat_std))
print('The mean and standard deviation of the mean parameter is %f(%f). ' % (mu_mean, mu_std))
print('The 15th, 50th and 85th percentile of the variance parameter is %f, %f and %f.'%(s_15, s_50, s_85))

Running Horovod

Currently, the only way to execute Horovod in MXFusion is via horovodrun or mpirun command from the system. Hence, we can first convert this notebook into Python file then execute the Python file with command line.

[ ]:
!jupyter nbconvert --to script getting_started-distributed.ipynb

To run it on Horovod and allow distributed training, we should run horovodrun or mpirun from our system while specifying the number of processors. More details about running Horovod can be found here. A simple way to run it is with the format: horovodrun -np {number of processors} -H localhost:4 python {python file}

NOTE : Please restart this notebook before executing the code below.

[ ]:
!mpirun -np 4 -H localhost:4 python getting_started-distributed.py