This is an automated email from the ASF dual-hosted git repository. niketanpansare pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/systemml.git
The following commit(s) were added to refs/heads/master by this push: new b657820 [SYSTEMML-540] Added looped_minibatch training algorithm in Keras2DML b657820 is described below commit b657820248fbb42f1c4f27564cdb14865ebeeec1 Author: Niketan Pansare <npan...@us.ibm.com> AuthorDate: Mon Mar 25 12:33:50 2019 -0700 [SYSTEMML-540] Added looped_minibatch training algorithm in Keras2DML - This algorithm performs multiple forward-backward passes (=`parallel_batches` parameters) with the given batch size, aggregate gradients and finally updates the model. - Updated the documentation. --- docs/beginners-guide-caffe2dml.md | 2 +- docs/beginners-guide-keras2dml.md | 35 ++++++++++++- src/main/python/systemml/mllearn/estimators.py | 11 ++-- .../scala/org/apache/sysml/api/dl/Caffe2DML.scala | 60 ++++++++++++++-------- 4 files changed, 82 insertions(+), 26 deletions(-) diff --git a/docs/beginners-guide-caffe2dml.md b/docs/beginners-guide-caffe2dml.md index 8814283..db74feb 100644 --- a/docs/beginners-guide-caffe2dml.md +++ b/docs/beginners-guide-caffe2dml.md @@ -161,7 +161,7 @@ Iter:2000, validation loss:173.66147359346, validation accuracy:97.4897540983606 Unlike Caffe where default train and test algorithm is `minibatch`, you can specify the algorithm using the parameters `train_algo` and `test_algo` (valid values are: `minibatch`, `allreduce_parallel_batches`, -and `allreduce`). Here are some common settings: +`looped_minibatch`, and `allreduce`). Here are some common settings: | | PySpark script | Changes to Network/Solver | |--------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------| diff --git a/docs/beginners-guide-keras2dml.md b/docs/beginners-guide-keras2dml.md index 4517be5..2259397 100644 --- a/docs/beginners-guide-keras2dml.md +++ b/docs/beginners-guide-keras2dml.md @@ -208,4 +208,37 @@ For example: for the expression `Keras2DML(..., display=100, test_iter=10, test_ To verify that Keras2DML produce same results as other Keras' backend, we have [Python unit tests](https://github.com/apache/systemml/blob/master/src/main/python/tests/test_nn_numpy.py) that compare the results of Keras2DML with that of TensorFlow. We assume that Keras team ensure that all their backends are consistent with their TensorFlow backend. - +#### How can I train very deep models on GPU? + +Unlike Keras where default train and test algorithm is `minibatch`, you can specify the +algorithm using the parameters `train_algo` and `test_algo` (valid values are: `minibatch`, `allreduce_parallel_batches`, +`looped_minibatch`, and `allreduce`). Here are some common settings: + +| | PySpark script | Changes to Network/Solver | +|--------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------| +| Single-node CPU execution (similar to Caffe with solver_mode: CPU) | `lenet.set(train_algo="minibatch", test_algo="minibatch")` | Ensure that `batch_size` is set to appropriate value (for example: 64) | +| Single-node single-GPU execution | `lenet.set(train_algo="minibatch", test_algo="minibatch").setGPU(True).setForceGPU(True)` | Ensure that `batch_size` is set to appropriate value (for example: 64) | +| Single-node multi-GPU execution (similar to Caffe with solver_mode: GPU) | `lenet.set(train_algo="allreduce_parallel_batches", test_algo="minibatch", parallel_batches=num_gpu).setGPU(True).setForceGPU(True)` | Ensure that `batch_size` is set to appropriate value (for example: 64) | +| Distributed prediction | `lenet.set(test_algo="allreduce")` | | +| Distributed synchronous training | `lenet.set(train_algo="allreduce_parallel_batches", parallel_batches=num_cluster_cores)` | Ensure that `batch_size` is set to appropriate value (for example: 64) | + +Here are high-level guidelines to train very deep models on GPU with Keras2DML (and Caffe2DML): + +1. If there exists at least one layer/operator that does not fit on the device, please allow SystemML's optimizer to perform operator placement based on the memory estimates `sysml_model.setGPU(True)`. +2. If each individual layer/operator fits on the device but not the entire network with a batch size of 1, then +- Rely on SystemML's GPU Memory Manager to perform automatic eviction (recommended): `sysml_model.setGPU(True) # Optional: .setForceGPU(True)` +- Or enable Nvidia's Unified Memory: `sysml_model.setConfigProperty('sysml.gpu.memory.allocator', 'unified_memory')` +3. If the entire neural network does not fit in the GPU memory with the user-specified `batch_size`, but fits in the GPU memory with `local_batch_size` such that `1 << local_batch_size < batch_size`, then +- Use either of the above two options. +- Or enable `train_algo` that performs multiple forward-backward pass with batch size `local_batch_size`, aggregate gradients and finally updates the model: +```python +sysml_model = Keras2DML(spark, keras_model, batch_size=local_batch_size) +sysml_model.set(train_algo="looped_minibatch", parallel_batches=int(batch_size/local_batch_size)) +sysml_model.setGPU(True).setForceGPU(True) +``` +- Or add `int(batch_size/local_batch_size)` GPUs and perform single-node multi-GPU training with batch size `local_batch_size`: +```python +sysml_model = Keras2DML(spark, keras_model, batch_size=local_batch_size) +sysml_model.set(train_algo="allreduce_parallel_batches", parallel_batches=int(batch_size/local_batch_size)) +sysml_model.setGPU(True).setForceGPU(True) +``` diff --git a/src/main/python/systemml/mllearn/estimators.py b/src/main/python/systemml/mllearn/estimators.py index 456280b..0b47d8c 100644 --- a/src/main/python/systemml/mllearn/estimators.py +++ b/src/main/python/systemml/mllearn/estimators.py @@ -923,22 +923,23 @@ class Caffe2DML(BaseSystemMLClassifier): def set(self, debug=None, train_algo=None, test_algo=None, parallel_batches=None, output_activations=None, perform_one_hot_encoding=None, parfor_parameters=None, inline_nn_library=None, use_builtin_lstm_fn=None, - perform_fused_backward_update=None): + perform_fused_backward_update=None, weight_parallel_batches=None): """ Set input to Caffe2DML Parameters ---------- debug: to add debugging DML code such as classification report, print DML script, etc (default: False) - train_algo: can be minibatch, batch, allreduce_parallel_batches or allreduce (default: minibatch) - test_algo: can be minibatch, batch, allreduce_parallel_batches or allreduce (default: minibatch) - parallel_batches: number of parallel batches + train_algo: can be minibatch, batch, allreduce_parallel_batches, looped_minibatch or allreduce (default: minibatch) + test_algo: can be minibatch, batch, allreduce_parallel_batches, looped_minibatch or allreduce (default: minibatch) + parallel_batches: number of parallel batches (required for allreduce_parallel_batches or looped_minibatch) output_activations: (developer flag) directory to output activations of each layer as csv while prediction. To be used only in batch mode (default: None) perform_one_hot_encoding: should perform one-hot encoding in DML using table function (default: True) parfor_parameters: dictionary for parfor parameters when using allreduce-style algorithms (default: "") inline_nn_library: whether to inline the NN library when generating DML using Caffe2DML (default: False) use_builtin_lstm_fn: whether to use builtin lstm function for LSTM layer (default: True) perform_fused_backward_update: whether to perform update immediately after backward pass at the script level. Supported for minibatch and batch algorithms. (default: True) + weight_parallel_batches: whether to multiply 1/parallel_batches to gradients before performing SGD update (default: True) """ if debug is not None: self.estimator.setInput("$debug", str(debug).upper()) @@ -954,6 +955,8 @@ class Caffe2DML(BaseSystemMLClassifier): self.estimator.setInput("$use_builtin_lstm_fn", str(use_builtin_lstm_fn).upper()) if perform_fused_backward_update is not None: self.estimator.setInput("$perform_fused_backward_update", str(perform_fused_backward_update).upper()) + if weight_parallel_batches is not None: + self.estimator.setInput("$weight_parallel_batches", str(weight_parallel_batches).upper()) if output_activations is not None: self.estimator.setInput( "$output_activations", diff --git a/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala b/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala index c5a20db..9950d69 100644 --- a/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala +++ b/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala @@ -118,7 +118,7 @@ To shield from network files that violates this restriction, Caffe2DML performs object Caffe2DML { val LOG = LogFactory.getLog(classOf[Caffe2DML].getName()) // ------------------------------------------------------------------------ - val USE_PLUS_EQ = true + var USE_PLUS_EQ = true def nnDir = "nn/" def layerDir = nnDir + "layers/" def optimDir = nnDir + "optim/" @@ -157,6 +157,7 @@ object Caffe2DML { val rand = new Random // Supported Algorithms: val MINIBATCH_ALGORITHM = "minibatch" + val LOOPED_MINIBATCH_ALGORITHM = "looped_minibatch" val BATCH_ALGORITHM = "batch" val ALLREDUCE_ALGORITHM = "allreduce" val ALLREDUCE_PARALLEL_BATCHES_ALGORITHM = "allreduce_parallel_batches" @@ -321,6 +322,7 @@ class Caffe2DML(val sc: SparkContext, case "$inline_nn_library" => false case "$use_builtin_lstm_fn" => true case "$perform_fused_backward_update" => true + case "$weight_parallel_batches" => true case _ => throw new DMLRuntimeException("Unsupported input:" + key) } } @@ -329,7 +331,7 @@ class Caffe2DML(val sc: SparkContext, // The below method parses the provided network and solver file and generates DML script. def getTrainingScript(isSingleNode: Boolean): (Script, String, String) = { val startTrainingTime = System.nanoTime() - + reset // Reset the state of DML generator for training script. // Flags passed by user @@ -357,7 +359,9 @@ class Caffe2DML(val sc: SparkContext, tabDMLScript.append(print(dmlConcat(asDMLString("Iterations (for training loss/accuracy) refers to the number of batches processed where batch size="), Caffe2DML.batchSize))) } if(getTrainAlgo.toLowerCase.equals(Caffe2DML.ALLREDUCE_PARALLEL_BATCHES_ALGORITHM) || - getTestAlgo.toLowerCase.equals(Caffe2DML.ALLREDUCE_PARALLEL_BATCHES_ALGORITHM)) { + getTestAlgo.toLowerCase.equals(Caffe2DML.ALLREDUCE_PARALLEL_BATCHES_ALGORITHM) || + getTrainAlgo.toLowerCase.equals(Caffe2DML.LOOPED_MINIBATCH_ALGORITHM) || + getTestAlgo.toLowerCase.equals(Caffe2DML.LOOPED_MINIBATCH_ALGORITHM)) { assign(tabDMLScript, "parallel_batches", "$parallel_batches") } // ---------------------------------------------------------------------------- @@ -426,7 +430,7 @@ class Caffe2DML(val sc: SparkContext, lrPolicy.updateLearningRate(tabDMLScript) } } - case Caffe2DML.ALLREDUCE_PARALLEL_BATCHES_ALGORITHM => { + case Caffe2DML.LOOPED_MINIBATCH_ALGORITHM | Caffe2DML.ALLREDUCE_PARALLEL_BATCHES_ALGORITHM => { assign(tabDMLScript, "e", "0") assign(tabDMLScript, "max_iter", ifdef("$max_iter", solverParam.getMaxIter.toString)) forBlock("iter", "1", "max_iter", "parallel_batches") { @@ -436,7 +440,16 @@ class Caffe2DML(val sc: SparkContext, assign(tabDMLScript, "allreduce_start_index", "1") } initializeGradients("parallel_batches") - parForBlock("j", "1", "parallel_batches", "1", getParforParameters()) { + val old_USE_PLUS_EQ = Caffe2DML.USE_PLUS_EQ + val iterBlock = if(getTrainAlgo.toLowerCase.equals(Caffe2DML.ALLREDUCE_PARALLEL_BATCHES_ALGORITHM)) { + parForBlock("j", "1", "parallel_batches", "1", getParforParameters()) _ + } + else { + Caffe2DML.USE_PLUS_EQ = true + forBlock("j", "1", "parallel_batches", "1") _ + } + + iterBlock { // Get a mini-batch in this group assign(tabDMLScript, "beg", "allreduce_start_index + (j-1)*" + Caffe2DML.batchSize) assign(tabDMLScript, "end", "allreduce_start_index + j*" + Caffe2DML.batchSize + " - 1") @@ -463,6 +476,7 @@ class Caffe2DML(val sc: SparkContext, } } performSnapshot + Caffe2DML.USE_PLUS_EQ = old_USE_PLUS_EQ } } case Caffe2DML.ALLREDUCE_ALGORITHM => { @@ -570,7 +584,7 @@ class Caffe2DML(val sc: SparkContext, tabDMLScript.append("# Compute validation loss & accuracy\n") assign(tabDMLScript, "loss", "0"); assign(tabDMLScript, "accuracy", "0") getTestAlgo.toLowerCase match { - case Caffe2DML.MINIBATCH_ALGORITHM => { + case Caffe2DML.MINIBATCH_ALGORITHM | Caffe2DML.LOOPED_MINIBATCH_ALGORITHM => { assign(tabDMLScript, "validation_loss", "0") assign(tabDMLScript, "validation_accuracy", "0") forBlock("iVal", "1", "num_batches_per_epoch") { @@ -695,29 +709,35 @@ class Caffe2DML(val sc: SparkContext, } } private def flattenGradients(): Unit = { - if(Caffe2DML.USE_PLUS_EQ) { - // Note: We multiply by a weighting to allow for proper gradient averaging during the - // aggregation even with uneven batch sizes. + if(!Caffe2DML.USE_PLUS_EQ) { + tabDMLScript.append("# Flatten and store gradients for this parallel execution\n") + } + val isLoopedMinibatch = getTrainAlgo.toLowerCase.equals(Caffe2DML.LOOPED_MINIBATCH_ALGORITHM) + val suffixDML = if(getInputBooleanValue("$weight_parallel_batches")) " * weighting" else "" + // Note: We multiply by a weighting to allow for proper gradient averaging during the + // aggregation even with uneven batch sizes. + if(getInputBooleanValue("$weight_parallel_batches")) { assign(tabDMLScript, "weighting", "1/parallel_batches") // "nrow(Xb)/X_group_batch_size") + } + if(Caffe2DML.USE_PLUS_EQ) { net.getLayers .map(layer => net.getCaffeLayer(layer)) .map(l => { - if (l.shouldUpdateWeight) assignPlusEq(tabDMLScript, l.dWeight + "_agg", l.dWeight + "*weighting") - if (l.shouldUpdateExtraWeight) assignPlusEq(tabDMLScript, l.dExtraWeight + "_agg", l.dExtraWeight + "*weighting") - if (l.shouldUpdateWeight) assignPlusEq(tabDMLScript, l.dBias + "_agg", l.dBias + "*weighting") + if (l.shouldUpdateWeight) assignPlusEq(tabDMLScript, l.dWeight + "_agg", l.dWeight + suffixDML) + if (l.shouldUpdateExtraWeight) assignPlusEq(tabDMLScript, l.dExtraWeight + "_agg", l.dExtraWeight + suffixDML) + if (l.shouldUpdateWeight) assignPlusEq(tabDMLScript, l.dBias + "_agg", l.dBias + suffixDML) }) } else { - tabDMLScript.append("# Flatten and store gradients for this parallel execution\n") - // Note: We multiply by a weighting to allow for proper gradient averaging during the - // aggregation even with uneven batch sizes. - assign(tabDMLScript, "weighting", "1/parallel_batches") // "nrow(Xb)/X_group_batch_size") + if(isLoopedMinibatch) { + throw new DMLRuntimeException("Flattening and storing gradients is not supported for looped_minibatch algorithm") + } net.getLayers .map(layer => net.getCaffeLayer(layer)) .map(l => { - if (l.shouldUpdateWeight) assign(tabDMLScript, l.dWeight + "_agg[j,]", matrix(l.dWeight, "1", multiply(nrow(l.weight), ncol(l.weight))) + " * weighting") - if (l.shouldUpdateExtraWeight) assign(tabDMLScript, l.dExtraWeight + "_agg[j,]", matrix(l.dExtraWeight, "1", multiply(nrow(l.extraWeight), ncol(l.extraWeight))) + " * weighting") - if (l.shouldUpdateWeight) assign(tabDMLScript, l.dBias + "_agg[j,]", matrix(l.dBias, "1", multiply(nrow(l.bias), ncol(l.bias))) + " * weighting") + if (l.shouldUpdateWeight) assign(tabDMLScript, l.dWeight + "_agg[j,]", matrix(l.dWeight, "1", multiply(nrow(l.weight), ncol(l.weight))) + suffixDML) + if (l.shouldUpdateExtraWeight) assign(tabDMLScript, l.dExtraWeight + "_agg[j,]", matrix(l.dExtraWeight, "1", multiply(nrow(l.extraWeight), ncol(l.extraWeight))) + suffixDML) + if (l.shouldUpdateWeight) assign(tabDMLScript, l.dBias + "_agg[j,]", matrix(l.dBias, "1", multiply(nrow(l.bias), ncol(l.bias))) + suffixDML) }) } } @@ -807,7 +827,7 @@ class Caffe2DMLModel(val numClasses: String, val sc: SparkContext, val solver: C val lastLayerShape = estimator.getOutputShapeOfLastLayer assign(tabDMLScript, "Prob", matrix("1", Caffe2DML.numImages, (lastLayerShape._1 * lastLayerShape._2 * lastLayerShape._3).toString)) estimator.getTestAlgo.toLowerCase match { - case Caffe2DML.MINIBATCH_ALGORITHM => { + case Caffe2DML.MINIBATCH_ALGORITHM | Caffe2DML.LOOPED_MINIBATCH_ALGORITHM => { ceilDivide(tabDMLScript(), "num_iters", Caffe2DML.numImages, Caffe2DML.batchSize) forBlock("iter", "1", "num_iters") { getTestBatch(tabDMLScript)