This is an automated email from the ASF dual-hosted git repository. niketanpansare pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/systemml.git
The following commit(s) were added to refs/heads/master by this push: new b49ac76 [SYSTEMML-540] Invoke update immediately after backward call at the script-level. b49ac76 is described below commit b49ac760c180baa0582c2168c4b58fb3c0108bc4 Author: Niketan Pansare <npan...@us.ibm.com> AuthorDate: Sat Mar 23 21:51:45 2019 -0700 [SYSTEMML-540] Invoke update immediately after backward call at the script-level. - This reduces the chance of unnecessary evictions especially when there are statement block cuts. - The configuration property `perform_fused_backward_update` allows the user to toggle this behavior and control the script-generation process. - Also, updated the release creation document to ensure that untracked files are not included in the artifacts. - For Resnet-200, the eviction time was reduced from 173.488 seconds to 60.048 seconds with minibatch size of 96. --- docs/release-creation-process.md | 3 + src/main/python/systemml/mllearn/estimators.py | 8 ++- .../scala/org/apache/sysml/api/dl/Caffe2DML.scala | 76 ++++++++++++++++++---- .../scala/org/apache/sysml/api/dl/CaffeLayer.scala | 2 + 4 files changed, 76 insertions(+), 13 deletions(-) diff --git a/docs/release-creation-process.md b/docs/release-creation-process.md index 3115390..bf05000 100644 --- a/docs/release-creation-process.md +++ b/docs/release-creation-process.md @@ -42,6 +42,9 @@ Step 1: Prepare the release. # Extract latest code to a directory <GitRepoHome> + + # Check if there are any untracked files (created by the unit tests) and remove them to avoid packing them in the artifacts + git status # Go to dev/release directory cd <GitRepoHome>/dev/release diff --git a/src/main/python/systemml/mllearn/estimators.py b/src/main/python/systemml/mllearn/estimators.py index 8d1e164..456280b 100644 --- a/src/main/python/systemml/mllearn/estimators.py +++ b/src/main/python/systemml/mllearn/estimators.py @@ -922,7 +922,8 @@ class Caffe2DML(BaseSystemMLClassifier): self.estimator.setWeightsToIgnore(ignore_weights) def set(self, debug=None, train_algo=None, test_algo=None, parallel_batches=None, - output_activations=None, perform_one_hot_encoding=None, parfor_parameters=None, inline_nn_library=None, use_builtin_lstm_fn=None): + output_activations=None, perform_one_hot_encoding=None, parfor_parameters=None, inline_nn_library=None, use_builtin_lstm_fn=None, + perform_fused_backward_update=None): """ Set input to Caffe2DML @@ -933,10 +934,11 @@ class Caffe2DML(BaseSystemMLClassifier): test_algo: can be minibatch, batch, allreduce_parallel_batches or allreduce (default: minibatch) parallel_batches: number of parallel batches output_activations: (developer flag) directory to output activations of each layer as csv while prediction. To be used only in batch mode (default: None) - perform_one_hot_encoding: should perform one-hot encoding in DML using table function (default: False) + perform_one_hot_encoding: should perform one-hot encoding in DML using table function (default: True) parfor_parameters: dictionary for parfor parameters when using allreduce-style algorithms (default: "") inline_nn_library: whether to inline the NN library when generating DML using Caffe2DML (default: False) use_builtin_lstm_fn: whether to use builtin lstm function for LSTM layer (default: True) + perform_fused_backward_update: whether to perform update immediately after backward pass at the script level. Supported for minibatch and batch algorithms. (default: True) """ if debug is not None: self.estimator.setInput("$debug", str(debug).upper()) @@ -950,6 +952,8 @@ class Caffe2DML(BaseSystemMLClassifier): self.estimator.setInput("$parallel_batches", str(parallel_batches)) if use_builtin_lstm_fn is not None: self.estimator.setInput("$use_builtin_lstm_fn", str(use_builtin_lstm_fn).upper()) + if perform_fused_backward_update is not None: + self.estimator.setInput("$perform_fused_backward_update", str(perform_fused_backward_update).upper()) if output_activations is not None: self.estimator.setInput( "$output_activations", diff --git a/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala b/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala index e480dfc..c5a20db 100644 --- a/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala +++ b/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala @@ -304,14 +304,27 @@ class Caffe2DML(val sc: SparkContext, net.getLayers.map(layer => {net.getCaffeLayer(layer).debugLayer = isDebug}) net.getLayers.map(layer => {net.getCaffeLayer(layer).caffe2dmlObj = this}) net.getLayers.filter(layer => net.getCaffeLayer(layer).isInstanceOf[LSTM]).map(layer => { - if (inputs.containsKey("$use_builtin_lstm_fn")) - net.getCaffeLayer(layer).asInstanceOf[LSTM].useBuiltinFunction(inputs.get("$use_builtin_lstm_fn").toLowerCase.toBoolean) + net.getCaffeLayer(layer).asInstanceOf[LSTM].useBuiltinFunction(getInputBooleanValue("$use_builtin_lstm_fn")) }) } // Comma is included def getParforParameters():String = if (inputs.containsKey("$parfor_parameters")) inputs.get("$parfor_parameters") else "" + def getInputBooleanValue(key:String):Boolean = { + if(inputs.containsKey(key)) + return inputs.get(key).toLowerCase.toBoolean + else { + key match { + case "$debug" => false + case "$perform_one_hot_encoding" => true + case "$inline_nn_library" => false + case "$use_builtin_lstm_fn" => true + case "$perform_fused_backward_update" => true + case _ => throw new DMLRuntimeException("Unsupported input:" + key) + } + } + } // ================================================================================================ // The below method parses the provided network and solver file and generates DML script. def getTrainingScript(isSingleNode: Boolean): (Script, String, String) = { @@ -320,16 +333,18 @@ class Caffe2DML(val sc: SparkContext, reset // Reset the state of DML generator for training script. // Flags passed by user - val DEBUG_TRAINING = if (inputs.containsKey("$debug")) inputs.get("$debug").toLowerCase.toBoolean else false - Caffe2DML.INLINE_NN_LIBRARY = if (inputs.containsKey("$inline_nn_library")) inputs.get("$inline_nn_library").toLowerCase.toBoolean else false + val DEBUG_TRAINING = getInputBooleanValue("$debug") + Caffe2DML.INLINE_NN_LIBRARY = getInputBooleanValue("$inline_nn_library") assign(tabDMLScript, "debug", if (DEBUG_TRAINING) "TRUE" else "FALSE") setDebugFlags(DEBUG_TRAINING) appendHeaders(net, solver, true) // Appends DML corresponding to source and externalFunction statements. - val performOneHotEncoding = !inputs.containsKey("$perform_one_hot_encoding") || inputs.get("$perform_one_hot_encoding").toBoolean + val performOneHotEncoding = getInputBooleanValue("$perform_one_hot_encoding") readInputData(net, true, performOneHotEncoding) // Read X_full and y_full // Initialize the layers and solvers. Reads weights and bias if $weights is set. initWeights(net, solver, inputs.containsKey("$weights"), layersToIgnore) + + val performFusedBackwardUpdate = getInputBooleanValue("$perform_fused_backward_update") // Split into training and validation set // Initializes Caffe2DML.X, Caffe2DML.y, Caffe2DML.XVal, Caffe2DML.yVal and Caffe2DML.numImages @@ -355,7 +370,13 @@ class Caffe2DML(val sc: SparkContext, getTrainingBatch(tabDMLScript) // ------------------------------------------------------- // Perform forward, backward and update on minibatch - forward; backward; update + forward; + if(performFusedBackwardUpdate) { + backwardUpdate + } + else { + backward; update + } // ------------------------------------------------------- if(solverParam.getDisplay > 0) { ifBlock("iter %% " + solverParam.getDisplay + " == 0") { @@ -385,7 +406,13 @@ class Caffe2DML(val sc: SparkContext, assign(tabDMLScript, "yb", Caffe2DML.y) // ------------------------------------------------------- // Perform forward, backward and update on entire dataset - forward; backward; update + forward + if(performFusedBackwardUpdate) { + backwardUpdate + } + else { + backward; update + } // ------------------------------------------------------- if(solverParam.getDisplay > 0) { // Show training/validation loss every epoch @@ -416,6 +443,10 @@ class Caffe2DML(val sc: SparkContext, rightIndexing(tabDMLScript, "Xb", Caffe2DML.X, "beg", "end") rightIndexing(tabDMLScript, "yb", Caffe2DML.y, "beg", "end") forward; backward + if(performFusedBackwardUpdate && inputs.containsKey("$perform_fused_backward_update")) { + // Warn user only if the user explicitly ask for it + Caffe2DML.LOG.warn("Fused backward update is not supported for allreduce_parallel_batches") + } flattenGradients if(solverParam.getDisplay > 0) { ifBlock("(iter + j - 1) %% " + solverParam.getDisplay + " == 0") { @@ -447,6 +478,10 @@ class Caffe2DML(val sc: SparkContext, assign(tabDMLScript, "Xb", Caffe2DML.X + "[j,]") assign(tabDMLScript, "yb", Caffe2DML.y + "[j,]") forward; backward + if(performFusedBackwardUpdate && inputs.containsKey("$perform_fused_backward_update")) { + // Warn user only if the user explicitly ask for it + Caffe2DML.LOG.warn("Fused backward update is not supported for allreduce_parallel_batches") + } flattenGradients } aggregateAggGradients @@ -513,7 +548,7 @@ class Caffe2DML(val sc: SparkContext, } private def displayTrainingLoss(lossLayer: IsLossLayer, performOneHotEncoding:Boolean): Unit = { - val DEBUG_TRAINING = if (inputs.containsKey("$debug")) inputs.get("$debug").toLowerCase.toBoolean else false + val DEBUG_TRAINING = getInputBooleanValue("$debug") tabDMLScript.append("# Compute training loss & accuracy\n") assign(tabDMLScript, "loss", "0"); assign(tabDMLScript, "accuracy", "0") lossLayer.computeLoss(dmlScript, numTabs) @@ -619,6 +654,25 @@ class Caffe2DML(val sc: SparkContext, tabDMLScript.append("# Update the parameters\n") net.getLayers.map(layer => solver.update(tabDMLScript, net.getCaffeLayer(layer))) } + private def backwardUpdate(): Unit = { + tabDMLScript.append("# Perform backward pass and update the parameters\n") + var skipedLayer:String = null + for(layer <- net.getLayers.reverse) { + val caffeLayer = net.getCaffeLayer(layer) + caffeLayer.backward(tabDMLScript, "") + if(caffeLayer.isInstanceOf[Scale] && caffeLayer.asInstanceOf[Scale].isPartOfBatchNorm) { + skipedLayer = layer // Skip update + } + else { + solver.update(tabDMLScript, caffeLayer) // Perform update of the current layer + if(skipedLayer != null) { + // And then update the skipped layer (if any) + solver.update(tabDMLScript, net.getCaffeLayer(skipedLayer)) + skipedLayer = null + } + } + } + } private def initializeGradients(parallel_batches: String): Unit = { tabDMLScript.append("# Data structure to store gradients computed in parallel\n") if(Caffe2DML.USE_PLUS_EQ) { @@ -730,13 +784,13 @@ class Caffe2DMLModel(val numClasses: String, val sc: SparkContext, val solver: C reset // Reset the state of DML generator for training script. - val DEBUG_PREDICTION = if (estimator.inputs.containsKey("$debug")) estimator.inputs.get("$debug").toLowerCase.toBoolean else false - Caffe2DML.INLINE_NN_LIBRARY = if (estimator.inputs.containsKey("$inline_nn_library")) estimator.inputs.get("$inline_nn_library").toLowerCase.toBoolean else false + val DEBUG_PREDICTION = estimator.getInputBooleanValue("$debug") + Caffe2DML.INLINE_NN_LIBRARY = estimator.getInputBooleanValue("$inline_nn_library") assign(tabDMLScript, "debug", if (DEBUG_PREDICTION) "TRUE" else "FALSE") estimator.setDebugFlags(DEBUG_PREDICTION) appendHeaders(net, solver, false) // Appends DML corresponding to source and externalFunction statements. - val performOneHotEncoding = !estimator.inputs.containsKey("$perform_one_hot_encoding") || estimator.inputs.get("$perform_one_hot_encoding").toBoolean + val performOneHotEncoding = estimator.getInputBooleanValue("$perform_one_hot_encoding") readInputData(net, false, performOneHotEncoding) // Read X_full and y_full assign(tabDMLScript, "X", "X_full") diff --git a/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala b/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala index 62323d1..bf86f38 100644 --- a/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala +++ b/src/main/scala/org/apache/sysml/api/dl/CaffeLayer.scala @@ -370,6 +370,7 @@ class BatchNorm(val param: LayerParameter, val id: Int, val net: CaffeNetwork) e val topLayers = net.getTopLayers(param.getName).map(l => net.getCaffeLayer(l)).toList if (topLayers.length != 1 && !topLayers(0).isInstanceOf[Scale]) throw new LanguageException("Only one top layer of type Scale allowed for BatchNorm") scaleLayer = topLayers(0).asInstanceOf[Scale] + scaleLayer.isPartOfBatchNorm = true } def numChannels = bottomLayerOutputShape._1 def Hin = bottomLayerOutputShape._2 @@ -385,6 +386,7 @@ class Scale(val param: LayerParameter, val id: Int, val net: CaffeNetwork) exten override def backward(dmlScript: StringBuilder, outSuffix: String): Unit = assignDoutToDX(dmlScript, outSuffix) override def weightShape(): Array[Int] = Array(bottomLayerOutputShape._1.toInt, 1) override def biasShape(): Array[Int] = Array(bottomLayerOutputShape._1.toInt, 1) + var isPartOfBatchNorm = false } // ------------------------------------------------------------------