Repository: systemml Updated Branches: refs/heads/master a9c14b02b -> 4bc1fea87
[MINOR] Fixed incorrect memory estimates in Caffe2DML summary for a network with separate label and features data layer - Also added a warn message when the user tries to run SystemML with less than local memory budget. Project: http://git-wip-us.apache.org/repos/asf/systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/systemml/commit/4bc1fea8 Tree: http://git-wip-us.apache.org/repos/asf/systemml/tree/4bc1fea8 Diff: http://git-wip-us.apache.org/repos/asf/systemml/diff/4bc1fea8 Branch: refs/heads/master Commit: 4bc1fea872096e912045c1ea5d2d5e54b3206793 Parents: a9c14b0 Author: Niketan Pansare <npan...@us.ibm.com> Authored: Tue Nov 14 15:59:17 2017 -0800 Committer: Niketan Pansare <npan...@us.ibm.com> Committed: Tue Nov 14 16:03:25 2017 -0800 ---------------------------------------------------------------------- .../org/apache/sysml/api/dl/Caffe2DML.scala | 28 +++++++++----------- .../sysml/api/ml/BaseSystemMLClassifier.scala | 26 +++++++++++++++++- 2 files changed, 37 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/systemml/blob/4bc1fea8/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala b/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala index 56be5d6..789d08a 100644 --- a/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala +++ b/src/main/scala/org/apache/sysml/api/dl/Caffe2DML.scala @@ -309,18 +309,26 @@ class Caffe2DML(val sc: SparkContext, def getTestAlgo(): String = if (inputs.containsKey("$test_algo")) inputs.get("$test_algo") else "minibatch" private def getMemInBytes(l:CaffeLayer, batchSize:Int, isTraining:Boolean):Long = { + val numLayerInput = if(!l.isInstanceOf[Data]) l.bottomLayerOutputShape._1.toLong * l.bottomLayerOutputShape._2.toLong * l.bottomLayerOutputShape._3.toLong * batchSize else 0 val numLayerOutput = l.outputShape._1.toLong * l.outputShape._2.toLong * l.outputShape._3.toLong * batchSize val numLayerError = numLayerOutput val numLayerWeights = if(l.weightShape != null) l.weightShape()(0).toLong * l.weightShape()(1).toLong else 0 val numLayerBias = if(l.biasShape != null)l.biasShape()(0).toLong * l.biasShape()(1).toLong else 0 val numLayerGradients = (numLayerWeights + numLayerBias) * batchSize - if(isTraining) (numLayerOutput + numLayerError + numLayerWeights + numLayerBias + numLayerGradients)*Double.BYTES - else (numLayerOutput + numLayerWeights + numLayerBias)*Double.BYTES + if(isTraining) (numLayerInput + numLayerOutput + numLayerError + numLayerWeights + numLayerBias + numLayerGradients)*Double.BYTES + else (numLayerInput + numLayerOutput + numLayerWeights + numLayerBias)*Double.BYTES } def summary(sparkSession: org.apache.spark.sql.SparkSession): Unit = { val layers = net.getLayers .map(l => (l, net.getCaffeLayer(l))) val numDataLayers = layers.filter(l => l._2.isInstanceOf[Data]).length - val batchSize = if(numDataLayers == 1) layers.filter(l => l._2.isInstanceOf[Data]).map(l => l._2.param.getDataParam.getBatchSize).get(0) else -1 + val batchSizes = layers.filter(l => l._2.isInstanceOf[Data]).map(l => l._2.param.getDataParam.getBatchSize).distinct + if(batchSizes.size > 1) { + Caffe2DML.LOG.warn("Multiple data layers with different batch sizes:" + batchSizes.mkString(",") + ". Using the batch size:" + batchSizes.get(0)) + } + else if(batchSizes.size == 0) { + Caffe2DML.LOG.warn("No data layers found and hence ignoring the memory computation.") + } + val batchSize = if(batchSizes.size > 0) batchSizes.get(0) else -1 val header = Seq("Name", "Type", "Output", "Weight", "Bias", "Top", "Bottom", "Memory* (train/test)") val entries = layers .map(l => { @@ -347,19 +355,7 @@ class Caffe2DML(val sc: SparkContext, val crspq = convLayers.map(l => l.numChannels.toLong*l.kernel_h.toLong*l.kernel_w.toLong*l.outputShape._2.toLong*l.outputShape._3.toLong) val kpq = convLayers.map(l => l.outputShape._1.toLong*l.outputShape._2.toLong*l.outputShape._3.toLong) - if(getTrainAlgo().equals("minibatch") && getTestAlgo().equals("minibatch")) { - System.out.println("Total number of layer outputs/errors/weights/bias/gradients: " + numLayerOutput + "/" + numLayerError + - "/" + numLayerWeights + "/" + numLayerBias + "/" + numLayerGradients) - System.out.println("Total memory requirements for parameters* for train/test: " + - OptimizerUtils.toMB(layers.map(l => getMemInBytes(l._2, batchSize, true)).sum) + "/" + - OptimizerUtils.toMB(layers.map(l => getMemInBytes(l._2, batchSize, false)).sum)) - System.out.println("[Advanced] Key network statistics to compute intermediate CP overhead " + - "batchSize/maxThreads/1-thread im2col*(sum, max)/1-thread reshape_col*(sum, max): " + - batchSize + "/" + OptimizerUtils.getConstrainedNumThreads(-1) + "/(" + - OptimizerUtils.toMB(crspq.sum*Double.BYTES) + ", " + OptimizerUtils.toMB(crspq.max*Double.BYTES) + ")/(" + - OptimizerUtils.toMB(kpq.sum*Double.BYTES) + ", " + OptimizerUtils.toMB(kpq.max*Double.BYTES) + ").") - } - System.out.println("* => memory in megabytes assuming the parameters are in double precision and in dense format.") + System.out.println("* => memory in megabytes assuming the parameters (input, output activations, weights and backpropagation errors) are in double precision and in dense format.") } // ================================================================================================ http://git-wip-us.apache.org/repos/asf/systemml/blob/4bc1fea8/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala index ec086eb..ce92321 100644 --- a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala +++ b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala @@ -19,9 +19,12 @@ package org.apache.sysml.api.ml +import org.apache.commons.logging.LogFactory; import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD + import java.io.File + import org.apache.spark.SparkContext import org.apache.spark.ml.{ Estimator, Model } import org.apache.spark.sql.types.StructType @@ -30,12 +33,17 @@ import org.apache.sysml.runtime.matrix.MatrixCharacteristics import org.apache.sysml.runtime.matrix.data.MatrixBlock import org.apache.sysml.runtime.DMLRuntimeException import org.apache.sysml.runtime.instructions.spark.utils.{ RDDConverterUtils, RDDConverterUtilsExt } +import org.apache.sysml.api.DMLScript; import org.apache.sysml.api.mlcontext._ import org.apache.sysml.api.mlcontext.ScriptFactory._ import org.apache.spark.sql._ import org.apache.sysml.api.mlcontext.MLContext.ExplainLevel +import org.apache.sysml.hops.OptimizerUtils; + import java.util.HashMap + import scala.collection.JavaConversions._ + import java.util.Random /**************************************************** @@ -118,10 +126,18 @@ trait BaseSystemMLEstimatorOrModel { def setStatisticsMaxHeavyHitters(statisticsMaxHeavyHitters1: Int): BaseSystemMLEstimatorOrModel = { statisticsMaxHeavyHitters = statisticsMaxHeavyHitters1; this } def setConfigProperty(key: String, value: String): BaseSystemMLEstimatorOrModel = { config.put(key, value); this } def updateML(ml: MLContext): Unit = { - ml.setGPU(enableGPU); ml.setForceGPU(forceGPU); + System.gc(); + ml.setGPU(enableGPU); ml.setForceGPU(forceGPU); ml.setExplain(explain); ml.setExplainLevel(explainLevel); ml.setStatistics(statistics); ml.setStatisticsMaxHeavyHitters(statisticsMaxHeavyHitters); config.map(x => ml.setConfigProperty(x._1, x._2)) + // Since this is an approximate information, the check below only warns the users of unintended side effects + // (for example: holding too many strong references) and is not added as a safeguard. + val freeMem = Runtime.getRuntime().freeMemory(); + if(freeMem < OptimizerUtils.getLocalMemBudget()) { + val LOG = LogFactory.getLog(classOf[BaseSystemMLEstimatorOrModel].getName()) + LOG.warn("SystemML local memory budget:" + OptimizerUtils.toMB(OptimizerUtils.getLocalMemBudget()) + " mb. Approximate free memory available on the driver JVM:" + OptimizerUtils.toMB(freeMem) + " mb."); + } } def copyProperties(other: BaseSystemMLEstimatorOrModel): BaseSystemMLEstimatorOrModel = { other.setGPU(enableGPU); other.setForceGPU(forceGPU); @@ -236,6 +252,13 @@ trait BaseSystemMLClassifierModel extends BaseSystemMLEstimatorModel { .in("C", C) .in("H", H) .in("W", W) + + System.gc(); + val freeMem = Runtime.getRuntime().freeMemory(); + if(freeMem < OptimizerUtils.getLocalMemBudget()) { + val LOG = LogFactory.getLog(classOf[BaseSystemMLClassifierModel].getName()) + LOG.warn("SystemML local memory budget:" + OptimizerUtils.toMB(OptimizerUtils.getLocalMemBudget()) + " mb. Approximate free memory abailable:" + OptimizerUtils.toMB(freeMem)); + } val ret = (new MLContext(sc)).execute(script1).getMatrix("Prediction").toMatrixBlock if (ret.getNumColumns != 1 && H == 1 && W == 1) { @@ -251,6 +274,7 @@ trait BaseSystemMLClassifierModel extends BaseSystemMLEstimatorModel { val script = getPredictionScript(isSingleNode) // Uncomment for debugging // ml.setExplainLevel(ExplainLevel.RECOMPILE_RUNTIME) + val modelPredict = ml.execute(script._1.in(script._2, X, new MatrixMetadata(X.getNumRows, X.getNumColumns, X.getNonZeros))) return modelPredict.getMatrix(probVar) }