Repository: systemml Updated Branches: refs/heads/master ebb6ea612 -> f07b5a2d9
http://git-wip-us.apache.org/repos/asf/systemml/blob/f07b5a2d/src/main/scala/org/apache/sysml/api/dl/Utils.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/dl/Utils.scala b/src/main/scala/org/apache/sysml/api/dl/Utils.scala index 2684261..5d43730 100644 --- a/src/main/scala/org/apache/sysml/api/dl/Utils.scala +++ b/src/main/scala/org/apache/sysml/api/dl/Utils.scala @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -43,264 +43,266 @@ import org.apache.spark.api.java.JavaSparkContext object Utils { // --------------------------------------------------------------------------------------------- // Helper methods for DML generation - + // Returns number of classes if inferred from the network - def numClasses(net:CaffeNetwork):String = { - try { - return "" + net.getCaffeLayer(net.getLayers().last).outputShape._1.toLong - } catch { - case _:Throwable => { - Caffe2DML.LOG.warn("Cannot infer the number of classes from network definition. User needs to pass it via set(num_classes=...) method.") - return "$num_classes" // Expect users to provide it - } - } - } - def prettyPrintDMLScript(script:String) { - val bufReader = new BufferedReader(new StringReader(script)) - var line = bufReader.readLine(); - var lineNum = 1 - while( line != null ) { - System.out.println( "%03d".format(lineNum) + "|" + line) - lineNum = lineNum + 1 - line = bufReader.readLine() - } + def numClasses(net: CaffeNetwork): String = + try { + return "" + net.getCaffeLayer(net.getLayers().last).outputShape._1.toLong + } catch { + case _: Throwable => { + Caffe2DML.LOG.warn("Cannot infer the number of classes from network definition. User needs to pass it via set(num_classes=...) method.") + return "$num_classes" // Expect users to provide it + } + } + def prettyPrintDMLScript(script: String) { + val bufReader = new BufferedReader(new StringReader(script)) + var line = bufReader.readLine(); + var lineNum = 1 + while (line != null) { + System.out.println("%03d".format(lineNum) + "|" + line) + lineNum = lineNum + 1 + line = bufReader.readLine() + } } - + // --------------------------------------------------------------------------------------------- - def parseSolver(solverFilePath:String): CaffeSolver = parseSolver(readCaffeSolver(solverFilePath)) - def parseSolver(solver:SolverParameter): CaffeSolver = { - val momentum = if(solver.hasMomentum) solver.getMomentum else 0.0 - val lambda = if(solver.hasWeightDecay) solver.getWeightDecay else 0.0 - val delta = if(solver.hasDelta) solver.getDelta else 0.0 - - solver.getType.toLowerCase match { - case "sgd" => new SGD(lambda, momentum) - case "adagrad" => new AdaGrad(lambda, delta) - case "nesterov" => new Nesterov(lambda, momentum) - case _ => throw new DMLRuntimeException("The solver type is not supported: " + solver.getType + ". Try: SGD, AdaGrad or Nesterov.") - } - + def parseSolver(solverFilePath: String): CaffeSolver = parseSolver(readCaffeSolver(solverFilePath)) + def parseSolver(solver: SolverParameter): CaffeSolver = { + val momentum = if (solver.hasMomentum) solver.getMomentum else 0.0 + val lambda = if (solver.hasWeightDecay) solver.getWeightDecay else 0.0 + val delta = if (solver.hasDelta) solver.getDelta else 0.0 + + solver.getType.toLowerCase match { + case "sgd" => new SGD(lambda, momentum) + case "adagrad" => new AdaGrad(lambda, delta) + case "nesterov" => new Nesterov(lambda, momentum) + case _ => throw new DMLRuntimeException("The solver type is not supported: " + solver.getType + ". Try: SGD, AdaGrad or Nesterov.") + } + } - - // -------------------------------------------------------------- - // Caffe utility functions - def readCaffeNet(netFilePath:String):NetParameter = { - // Load network - val reader:InputStreamReader = getInputStreamReader(netFilePath); - val builder:NetParameter.Builder = NetParameter.newBuilder(); - TextFormat.merge(reader, builder); - return builder.build(); - } - - class CopyFloatToDoubleArray(data:java.util.List[java.lang.Float], rows:Int, cols:Int, transpose:Boolean, arr:Array[Double]) extends Thread { - override def run(): Unit = { - if(transpose) { + + // -------------------------------------------------------------- + // Caffe utility functions + def readCaffeNet(netFilePath: String): NetParameter = { + // Load network + val reader: InputStreamReader = getInputStreamReader(netFilePath); + val builder: NetParameter.Builder = NetParameter.newBuilder(); + TextFormat.merge(reader, builder); + return builder.build(); + } + + class CopyFloatToDoubleArray(data: java.util.List[java.lang.Float], rows: Int, cols: Int, transpose: Boolean, arr: Array[Double]) extends Thread { + override def run(): Unit = + if (transpose) { var iter = 0 - for(i <- 0 until cols) { - for(j <- 0 until rows) { - arr(j*cols + i) = data.get(iter).doubleValue() + for (i <- 0 until cols) { + for (j <- 0 until rows) { + arr(j * cols + i) = data.get(iter).doubleValue() iter += 1 } } - } - else { - for(i <- 0 until data.size()) { + } else { + for (i <- 0 until data.size()) { arr(i) = data.get(i).doubleValue() } } - } - } - - class CopyCaffeDeconvFloatToSystemMLDeconvDoubleArray(data:java.util.List[java.lang.Float], F:Int, C:Int, H:Int, W:Int, arr:Array[Double]) - extends CopyFloatToDoubleArray(data, C, F*H*W, false, arr) { - override def run(): Unit = { - var i = 0 - for(f <- 0 until F) { - for(c <- 0 until C) { - for(hw <- 0 until H*W) { - arr(c*F*H*W + f*H*W + hw) = data.get(i).doubleValue() - i = i+1 - } - } - } - } - } - - def allocateDeconvolutionWeight(data:java.util.List[java.lang.Float], F:Int, C:Int, H:Int, W:Int):(MatrixBlock,CopyFloatToDoubleArray) = { - val mb = new MatrixBlock(C, F*H*W, false) + } + + class CopyCaffeDeconvFloatToSystemMLDeconvDoubleArray(data: java.util.List[java.lang.Float], F: Int, C: Int, H: Int, W: Int, arr: Array[Double]) + extends CopyFloatToDoubleArray(data, C, F * H * W, false, arr) { + override def run(): Unit = { + var i = 0 + for (f <- 0 until F) { + for (c <- 0 until C) { + for (hw <- 0 until H * W) { + arr(c * F * H * W + f * H * W + hw) = data.get(i).doubleValue() + i = i + 1 + } + } + } + } + } + + def allocateDeconvolutionWeight(data: java.util.List[java.lang.Float], F: Int, C: Int, H: Int, W: Int): (MatrixBlock, CopyFloatToDoubleArray) = { + val mb = new MatrixBlock(C, F * H * W, false) mb.allocateDenseBlock() - val arr = mb.getDenseBlock + val arr = mb.getDenseBlock val thread = new CopyCaffeDeconvFloatToSystemMLDeconvDoubleArray(data, F, C, H, W, arr) - thread.start - return (mb, thread) - } - - def allocateMatrixBlock(data:java.util.List[java.lang.Float], rows:Int, cols:Int, transpose:Boolean):(MatrixBlock,CopyFloatToDoubleArray) = { - val mb = new MatrixBlock(rows, cols, false) + thread.start + return (mb, thread) + } + + def allocateMatrixBlock(data: java.util.List[java.lang.Float], rows: Int, cols: Int, transpose: Boolean): (MatrixBlock, CopyFloatToDoubleArray) = { + val mb = new MatrixBlock(rows, cols, false) mb.allocateDenseBlock() - val arr = mb.getDenseBlock + val arr = mb.getDenseBlock val thread = new CopyFloatToDoubleArray(data, rows, cols, transpose, arr) - thread.start - return (mb, thread) - } - def validateShape(shape:Array[Int], data:java.util.List[java.lang.Float], layerName:String): Unit = { - if(shape == null) + thread.start + return (mb, thread) + } + def validateShape(shape: Array[Int], data: java.util.List[java.lang.Float], layerName: String): Unit = + if (shape == null) throw new DMLRuntimeException("Unexpected weight for layer: " + layerName) - else if(shape.length != 2) + else if (shape.length != 2) throw new DMLRuntimeException("Expected shape to be of length 2:" + layerName) - else if(shape(0)*shape(1) != data.size()) - throw new DMLRuntimeException("Incorrect size of blob from caffemodel for the layer " + layerName + ". Expected of size " + shape(0)*shape(1) + ", but found " + data.size()) - } - - def saveCaffeModelFile(sc:JavaSparkContext, deployFilePath:String, - caffeModelFilePath:String, outputDirectory:String, format:String):Unit = { - saveCaffeModelFile(sc.sc, deployFilePath, caffeModelFilePath, outputDirectory, format) - } - - def saveCaffeModelFile(sc:SparkContext, deployFilePath:String, caffeModelFilePath:String, outputDirectory:String, format:String):Unit = { - val inputVariables = new java.util.HashMap[String, MatrixBlock]() - readCaffeNet(new CaffeNetwork(deployFilePath), deployFilePath, caffeModelFilePath, inputVariables) - val ml = new MLContext(sc) - val dmlScript = new StringBuilder - if(inputVariables.keys.size == 0) - throw new DMLRuntimeException("No weights found in the file " + caffeModelFilePath) - for(input <- inputVariables.keys) { - dmlScript.append("write(" + input + ", \"" + outputDirectory + "/" + input + ".mtx\", format=\"" + format + "\");\n") - } - if(Caffe2DML.LOG.isDebugEnabled()) - Caffe2DML.LOG.debug("Executing the script:" + dmlScript.toString) - val script = org.apache.sysml.api.mlcontext.ScriptFactory.dml(dmlScript.toString()).in(inputVariables) - ml.execute(script) - } - - def readCaffeNet(net:CaffeNetwork, netFilePath:String, weightsFilePath:String, inputVariables:java.util.HashMap[String, MatrixBlock]):NetParameter = { - // Load network - val reader:InputStreamReader = getInputStreamReader(netFilePath); - val builder:NetParameter.Builder = NetParameter.newBuilder(); - TextFormat.merge(reader, builder); - // Load weights - val inputStream = CodedInputStream.newInstance(new FileInputStream(weightsFilePath)) - inputStream.setSizeLimit(Integer.MAX_VALUE) - builder.mergeFrom(inputStream) - val net1 = builder.build(); - - val asyncThreads = new java.util.ArrayList[CopyFloatToDoubleArray]() - val v1Layers = net1.getLayersList.map(layer => layer.getName -> layer).toMap - for(layer <- net1.getLayerList) { - val blobs = if(layer.getBlobsCount != 0) layer.getBlobsList else if(v1Layers.contains(layer.getName)) v1Layers.get(layer.getName).get.getBlobsList else null - - if(blobs == null || blobs.size == 0) { - // No weight or bias - Caffe2DML.LOG.debug("The layer:" + layer.getName + " has no blobs") - } - else if(blobs.size == 2) { - // Both weight and bias - val caffe2DMLLayer = net.getCaffeLayer(layer.getName) - val transpose = caffe2DMLLayer.isInstanceOf[InnerProduct] - - // weight - val data = blobs(0).getDataList - val shape = caffe2DMLLayer.weightShape() - if(shape == null) - throw new DMLRuntimeException("Didnot expect weights for the layer " + layer.getName) - validateShape(shape, data, layer.getName) - - val ret1 = if(caffe2DMLLayer.isInstanceOf[DeConvolution]) { - // Swap dimensions: Caffe's format (F, C*Hf*Wf) to NN layer's format (C, F*Hf*Wf). - val deconvLayer = caffe2DMLLayer.asInstanceOf[DeConvolution] - val C = shape(0) - val F = deconvLayer.numKernels.toInt - val Hf = deconvLayer.kernel_h.toInt - val Wf = deconvLayer.kernel_w.toInt - allocateDeconvolutionWeight(data, F, C, Hf, Wf) - } - else { - allocateMatrixBlock(data, shape(0), shape(1), transpose) - } - asyncThreads.add(ret1._2) - inputVariables.put(caffe2DMLLayer.weight, ret1._1) - - // bias - val biasData = blobs(1).getDataList - val biasShape = caffe2DMLLayer.biasShape() - if(biasShape == null) - throw new DMLRuntimeException("Didnot expect bias for the layer " + layer.getName) - validateShape(biasShape, biasData, layer.getName) - val ret2 = allocateMatrixBlock(biasData, biasShape(0), biasShape(1), transpose) - asyncThreads.add(ret2._2) - inputVariables.put(caffe2DMLLayer.bias, ret2._1) - Caffe2DML.LOG.debug("Read weights/bias for layer:" + layer.getName) - } - else if(blobs.size == 1) { - // Special case: convolution/deconvolution without bias - // TODO: Extend nn layers to handle this situation + Generalize this to other layers, for example: InnerProduct - val caffe2DMLLayer = net.getCaffeLayer(layer.getName) - val convParam = if((caffe2DMLLayer.isInstanceOf[Convolution] || caffe2DMLLayer.isInstanceOf[DeConvolution]) && caffe2DMLLayer.param.hasConvolutionParam()) caffe2DMLLayer.param.getConvolutionParam else null - if(convParam == null) - throw new DMLRuntimeException("Layer with blob count " + layer.getBlobsCount + " is not supported for the layer " + layer.getName) - else if(convParam.hasBiasTerm && convParam.getBiasTerm) - throw new DMLRuntimeException("Layer with blob count " + layer.getBlobsCount + " and with bias term is not supported for the layer " + layer.getName) - - val data = blobs(0).getDataList - val shape = caffe2DMLLayer.weightShape() - validateShape(shape, data, layer.getName) - val ret1 = allocateMatrixBlock(data, shape(0), shape(1), false) - asyncThreads.add(ret1._2) - inputVariables.put(caffe2DMLLayer.weight, ret1._1) - inputVariables.put(caffe2DMLLayer.bias, new MatrixBlock(convParam.getNumOutput, 1, false)) - Caffe2DML.LOG.debug("Read only weight for layer:" + layer.getName) - } - else { - throw new DMLRuntimeException("Layer with blob count " + layer.getBlobsCount + " is not supported for the layer " + layer.getName) - } - } - - // Wait for the copy to be finished - for(t <- asyncThreads) { - t.join() - } - - for(mb <- inputVariables.values()) { - mb.recomputeNonZeros(); - } - - // Return the NetParameter without - return readCaffeNet(netFilePath) - } - - def readCaffeSolver(solverFilePath:String):SolverParameter = { - val reader = getInputStreamReader(solverFilePath); - val builder = SolverParameter.newBuilder(); - TextFormat.merge(reader, builder); - return builder.build(); - } - - // -------------------------------------------------------------- - // File IO utility functions - def writeToFile(content:String, filePath:String): Unit = { - val pw = new java.io.PrintWriter(new File(filePath)) - pw.write(content) - pw.close - } - def getInputStreamReader(filePath:String ):InputStreamReader = { - //read solver script from file - if(filePath == null) - throw new LanguageException("file path was not specified!"); - if(filePath.startsWith("hdfs:") || filePath.startsWith("gpfs:")) { - val fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); - return new InputStreamReader(fs.open(new Path(filePath))); - } - else { - return new InputStreamReader(new FileInputStream(new File(filePath)), "ASCII"); - } - } - // -------------------------------------------------------------- + else if (shape(0) * shape(1) != data.size()) + throw new DMLRuntimeException( + "Incorrect size of blob from caffemodel for the layer " + layerName + ". Expected of size " + shape(0) * shape(1) + ", but found " + data.size() + ) + + def saveCaffeModelFile(sc: JavaSparkContext, deployFilePath: String, caffeModelFilePath: String, outputDirectory: String, format: String): Unit = + saveCaffeModelFile(sc.sc, deployFilePath, caffeModelFilePath, outputDirectory, format) + + def saveCaffeModelFile(sc: SparkContext, deployFilePath: String, caffeModelFilePath: String, outputDirectory: String, format: String): Unit = { + val inputVariables = new java.util.HashMap[String, MatrixBlock]() + readCaffeNet(new CaffeNetwork(deployFilePath), deployFilePath, caffeModelFilePath, inputVariables) + val ml = new MLContext(sc) + val dmlScript = new StringBuilder + if (inputVariables.keys.size == 0) + throw new DMLRuntimeException("No weights found in the file " + caffeModelFilePath) + for (input <- inputVariables.keys) { + dmlScript.append("write(" + input + ", \"" + outputDirectory + "/" + input + ".mtx\", format=\"" + format + "\");\n") + } + if (Caffe2DML.LOG.isDebugEnabled()) + Caffe2DML.LOG.debug("Executing the script:" + dmlScript.toString) + val script = org.apache.sysml.api.mlcontext.ScriptFactory.dml(dmlScript.toString()).in(inputVariables) + ml.execute(script) + } + + def readCaffeNet(net: CaffeNetwork, netFilePath: String, weightsFilePath: String, inputVariables: java.util.HashMap[String, MatrixBlock]): NetParameter = { + // Load network + val reader: InputStreamReader = getInputStreamReader(netFilePath); + val builder: NetParameter.Builder = NetParameter.newBuilder(); + TextFormat.merge(reader, builder); + // Load weights + val inputStream = CodedInputStream.newInstance(new FileInputStream(weightsFilePath)) + inputStream.setSizeLimit(Integer.MAX_VALUE) + builder.mergeFrom(inputStream) + val net1 = builder.build(); + + val asyncThreads = new java.util.ArrayList[CopyFloatToDoubleArray]() + val v1Layers = net1.getLayersList.map(layer => layer.getName -> layer).toMap + + for (i <- 0 until net1.getLayerList.length) { + val layer = net1.getLayerList.get(i) + val blobs = getBlobs(layer, v1Layers) + + if (blobs == null || blobs.size == 0) { + // No weight or bias + Caffe2DML.LOG.debug("The layer:" + layer.getName + " has no blobs") + } else if (blobs.size == 2 || (blobs.size == 3 && net.getCaffeLayer(layer.getName).isInstanceOf[BatchNorm])) { + // Both weight and bias + val caffe2DMLLayer = net.getCaffeLayer(layer.getName) + val transpose = caffe2DMLLayer.isInstanceOf[InnerProduct] + + // weight + val shape = caffe2DMLLayer.weightShape() + if (shape == null) + throw new DMLRuntimeException("Didnot expect weights for the layer " + layer.getName) + if (caffe2DMLLayer.isInstanceOf[DeConvolution]) { + val data = blobs(0).getDataList + validateShape(shape, data, layer.getName) + // Swap dimensions: Caffe's format (F, C*Hf*Wf) to NN layer's format (C, F*Hf*Wf). + val deconvLayer = caffe2DMLLayer.asInstanceOf[DeConvolution] + val C = shape(0) + val F = deconvLayer.numKernels.toInt + val Hf = deconvLayer.kernel_h.toInt + val Wf = deconvLayer.kernel_w.toInt + val ret1 = allocateDeconvolutionWeight(data, F, C, Hf, Wf) + asyncThreads.add(ret1._2) + inputVariables.put(caffe2DMLLayer.weight, ret1._1) + } else { + inputVariables.put(caffe2DMLLayer.weight, getMBFromBlob(blobs(0), shape, layer.getName, transpose, asyncThreads)) + } + + // bias + val biasShape = caffe2DMLLayer.biasShape() + if (biasShape == null) + throw new DMLRuntimeException("Didnot expect bias for the layer " + layer.getName) + inputVariables.put(caffe2DMLLayer.bias, getMBFromBlob(blobs(1), biasShape, layer.getName, transpose, asyncThreads)) + Caffe2DML.LOG.debug("Read weights/bias for layer:" + layer.getName) + } else if (blobs.size == 1) { + // Special case: convolution/deconvolution without bias + // TODO: Extend nn layers to handle this situation + Generalize this to other layers, for example: InnerProduct + val caffe2DMLLayer = net.getCaffeLayer(layer.getName) + val convParam = + if ((caffe2DMLLayer.isInstanceOf[Convolution] || caffe2DMLLayer.isInstanceOf[DeConvolution]) && caffe2DMLLayer.param.hasConvolutionParam()) + caffe2DMLLayer.param.getConvolutionParam + else null + if (convParam == null) + throw new DMLRuntimeException("Layer with blob count " + layer.getBlobsCount + " is not supported for the layer " + layer.getName) + else if (convParam.hasBiasTerm && convParam.getBiasTerm) + throw new DMLRuntimeException("Layer with blob count " + layer.getBlobsCount + " and with bias term is not supported for the layer " + layer.getName) + + inputVariables.put(caffe2DMLLayer.weight, getMBFromBlob(blobs(0), caffe2DMLLayer.weightShape(), layer.getName, false, asyncThreads)) + inputVariables.put(caffe2DMLLayer.bias, new MatrixBlock(convParam.getNumOutput, 1, false)) + Caffe2DML.LOG.debug("Read only weight for layer:" + layer.getName) + } else { + throw new DMLRuntimeException("Layer with blob count " + layer.getBlobsCount + " is not supported for the layer " + layer.getName) + } + } + + // Wait for the copy to be finished + for (t <- asyncThreads) { + t.join() + } + + for (mb <- inputVariables.values()) { + mb.recomputeNonZeros(); + } + + // Return the NetParameter without + return readCaffeNet(netFilePath) + } + + def getBlobs(layer: LayerParameter, v1Layers: scala.collection.immutable.Map[String, caffe.Caffe.V1LayerParameter]): java.util.List[caffe.Caffe.BlobProto] = + if (layer.getBlobsCount != 0) + layer.getBlobsList + else if (v1Layers.contains(layer.getName)) v1Layers.get(layer.getName).get.getBlobsList + else null + + def getMBFromBlob(blob: caffe.Caffe.BlobProto, + shape: Array[Int], + layerName: String, + transpose: Boolean, + asyncThreads: java.util.ArrayList[CopyFloatToDoubleArray]): MatrixBlock = { + val data = blob.getDataList + validateShape(shape, data, layerName) + val ret1 = allocateMatrixBlock(data, shape(0), shape(1), transpose) + asyncThreads.add(ret1._2) + return ret1._1 + } + + def readCaffeSolver(solverFilePath: String): SolverParameter = { + val reader = getInputStreamReader(solverFilePath); + val builder = SolverParameter.newBuilder(); + TextFormat.merge(reader, builder); + return builder.build(); + } + + // -------------------------------------------------------------- + // File IO utility functions + def writeToFile(content: String, filePath: String): Unit = { + val pw = new java.io.PrintWriter(new File(filePath)) + pw.write(content) + pw.close + } + def getInputStreamReader(filePath: String): InputStreamReader = { + //read solver script from file + if (filePath == null) + throw new LanguageException("file path was not specified!"); + if (filePath.startsWith("hdfs:") || filePath.startsWith("gpfs:")) { + val fs = FileSystem.get(ConfigurationManager.getCachedJobConf()); + return new InputStreamReader(fs.open(new Path(filePath))); + } else { + return new InputStreamReader(new FileInputStream(new File(filePath)), "ASCII"); + } + } + // -------------------------------------------------------------- } class Utils { - def saveCaffeModelFile(sc:JavaSparkContext, deployFilePath:String, - caffeModelFilePath:String, outputDirectory:String, format:String):Unit = { + def saveCaffeModelFile(sc: JavaSparkContext, deployFilePath: String, caffeModelFilePath: String, outputDirectory: String, format: String): Unit = Utils.saveCaffeModelFile(sc, deployFilePath, caffeModelFilePath, outputDirectory, format) - } - + } http://git-wip-us.apache.org/repos/asf/systemml/blob/f07b5a2d/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala index f42acb5..ec086eb 100644 --- a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala +++ b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -23,13 +23,13 @@ import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.rdd.RDD import java.io.File import org.apache.spark.SparkContext -import org.apache.spark.ml.{ Model, Estimator } +import org.apache.spark.ml.{ Estimator, Model } import org.apache.spark.sql.types.StructType -import org.apache.spark.ml.param.{ Params, Param, ParamMap, DoubleParam } +import org.apache.spark.ml.param.{ DoubleParam, Param, ParamMap, Params } import org.apache.sysml.runtime.matrix.MatrixCharacteristics import org.apache.sysml.runtime.matrix.data.MatrixBlock import org.apache.sysml.runtime.DMLRuntimeException -import org.apache.sysml.runtime.instructions.spark.utils.{ RDDConverterUtilsExt, RDDConverterUtils } +import org.apache.sysml.runtime.instructions.spark.utils.{ RDDConverterUtils, RDDConverterUtilsExt } import org.apache.sysml.api.mlcontext._ import org.apache.sysml.api.mlcontext.ScriptFactory._ import org.apache.spark.sql._ @@ -38,12 +38,11 @@ import java.util.HashMap import scala.collection.JavaConversions._ import java.util.Random - /**************************************************** DESIGN DOCUMENT for MLLEARN API: -The mllearn API supports LogisticRegression, LinearRegression, SVM, NaiveBayes +The mllearn API supports LogisticRegression, LinearRegression, SVM, NaiveBayes and Caffe2DML. Every algorithm in this API has a python wrapper (implemented in the mllearn python package) -and a Scala class where the actual logic is implementation. +and a Scala class where the actual logic is implementation. Both wrapper and scala class follow the below hierarchy to reuse code and simplify the implementation. @@ -72,7 +71,6 @@ get the DML script. To enable this, each wrapper class has to implement followin 2. getPredictionScript(isSingleNode:Boolean): (Script object of mlcontext, variable name of X in the script:String) ****************************************************/ - trait HasLaplace extends Params { final val laplace: Param[Double] = new Param[Double](this, "laplace", "Laplace smoothing specified by the user to avoid creation of 0 probabilities.") setDefault(laplace, 1.0) @@ -105,27 +103,27 @@ trait HasRegParam extends Params { } trait BaseSystemMLEstimatorOrModel { - var enableGPU:Boolean = false - var forceGPU:Boolean = false - var explain:Boolean = false - var explainLevel:String = "runtime" - var statistics:Boolean = false - var statisticsMaxHeavyHitters:Int = 10 - val config:HashMap[String, String] = new HashMap[String, String]() - def setGPU(enableGPU1:Boolean):BaseSystemMLEstimatorOrModel = { enableGPU = enableGPU1; this} - def setForceGPU(enableGPU1:Boolean):BaseSystemMLEstimatorOrModel = { forceGPU = enableGPU1; this} - def setExplain(explain1:Boolean):BaseSystemMLEstimatorOrModel = { explain = explain1; this} - def setExplainLevel(explainLevel1:String):BaseSystemMLEstimatorOrModel = { explainLevel = explainLevel1; this } - def setStatistics(statistics1:Boolean):BaseSystemMLEstimatorOrModel = { statistics = statistics1; this} - def setStatisticsMaxHeavyHitters(statisticsMaxHeavyHitters1:Int):BaseSystemMLEstimatorOrModel = { statisticsMaxHeavyHitters = statisticsMaxHeavyHitters1; this} - def setConfigProperty(key:String, value:String):BaseSystemMLEstimatorOrModel = { config.put(key, value); this} - def updateML(ml:MLContext):Unit = { + var enableGPU: Boolean = false + var forceGPU: Boolean = false + var explain: Boolean = false + var explainLevel: String = "runtime" + var statistics: Boolean = false + var statisticsMaxHeavyHitters: Int = 10 + val config: HashMap[String, String] = new HashMap[String, String]() + def setGPU(enableGPU1: Boolean): BaseSystemMLEstimatorOrModel = { enableGPU = enableGPU1; this } + def setForceGPU(enableGPU1: Boolean): BaseSystemMLEstimatorOrModel = { forceGPU = enableGPU1; this } + def setExplain(explain1: Boolean): BaseSystemMLEstimatorOrModel = { explain = explain1; this } + def setExplainLevel(explainLevel1: String): BaseSystemMLEstimatorOrModel = { explainLevel = explainLevel1; this } + def setStatistics(statistics1: Boolean): BaseSystemMLEstimatorOrModel = { statistics = statistics1; this } + def setStatisticsMaxHeavyHitters(statisticsMaxHeavyHitters1: Int): BaseSystemMLEstimatorOrModel = { statisticsMaxHeavyHitters = statisticsMaxHeavyHitters1; this } + def setConfigProperty(key: String, value: String): BaseSystemMLEstimatorOrModel = { config.put(key, value); this } + def updateML(ml: MLContext): Unit = { ml.setGPU(enableGPU); ml.setForceGPU(forceGPU); ml.setExplain(explain); ml.setExplainLevel(explainLevel); - ml.setStatistics(statistics); ml.setStatisticsMaxHeavyHitters(statisticsMaxHeavyHitters); + ml.setStatistics(statistics); ml.setStatisticsMaxHeavyHitters(statisticsMaxHeavyHitters); config.map(x => ml.setConfigProperty(x._1, x._2)) } - def copyProperties(other:BaseSystemMLEstimatorOrModel):BaseSystemMLEstimatorOrModel = { + def copyProperties(other: BaseSystemMLEstimatorOrModel): BaseSystemMLEstimatorOrModel = { other.setGPU(enableGPU); other.setForceGPU(forceGPU); other.setExplain(explain); other.setExplainLevel(explainLevel); other.setStatistics(statistics); other.setStatisticsMaxHeavyHitters(statisticsMaxHeavyHitters); @@ -136,172 +134,168 @@ trait BaseSystemMLEstimatorOrModel { trait BaseSystemMLEstimator extends BaseSystemMLEstimatorOrModel { def transformSchema(schema: StructType): StructType = schema - var mloutput:MLResults = null + var mloutput: MLResults = null // Returns the script and variables for X and y - def getTrainingScript(isSingleNode:Boolean):(Script, String, String) - - def toDouble(i:Int): java.lang.Double = { + def getTrainingScript(isSingleNode: Boolean): (Script, String, String) + + def toDouble(i: Int): java.lang.Double = double2Double(i.toDouble) - } - - def toDouble(d:Double): java.lang.Double = { + + def toDouble(d: Double): java.lang.Double = double2Double(d) - } - + } trait BaseSystemMLEstimatorModel extends BaseSystemMLEstimatorOrModel { - def toDouble(i:Int): java.lang.Double = { + def toDouble(i: Int): java.lang.Double = double2Double(i.toDouble) - } - def toDouble(d:Double): java.lang.Double = { + def toDouble(d: Double): java.lang.Double = double2Double(d) - } - + def transform_probability(X: MatrixBlock): MatrixBlock; - + def transformSchema(schema: StructType): StructType = schema - + // Returns the script and variable for X - def getPredictionScript(isSingleNode:Boolean): (Script, String) - def baseEstimator():BaseSystemMLEstimator - def modelVariables():List[String] + def getPredictionScript(isSingleNode: Boolean): (Script, String) + def baseEstimator(): BaseSystemMLEstimator + def modelVariables(): List[String] // self.model.load(self.sc._jsc, weights, format, sep) - def load(sc:JavaSparkContext, outputDir:String, sep:String, eager:Boolean=false):Unit = { - val dmlScript = new StringBuilder - dmlScript.append("print(\"Loading the model from " + outputDir + "...\")\n") - val tmpSum = "tmp_sum_var" + Math.abs((new Random()).nextInt()) - if(eager) - dmlScript.append(tmpSum + " = 0\n") - for(varName <- modelVariables) { - dmlScript.append(varName + " = read(\"" + outputDir + sep + varName + ".mtx\")\n") - if(eager) - dmlScript.append(tmpSum + " = " + tmpSum + " + 0.001*mean(" + varName + ")\n") - } - if(eager) { - dmlScript.append("if(" + tmpSum + " > 0) { print(\"Loaded the model\"); } else { print(\"Loaded the model.\"); }") - } - val script = dml(dmlScript.toString) - for(varName <- modelVariables) { - script.out(varName) - } - val ml = new MLContext(sc) - baseEstimator.mloutput = ml.execute(script) + def load(sc: JavaSparkContext, outputDir: String, sep: String, eager: Boolean = false): Unit = { + val dmlScript = new StringBuilder + dmlScript.append("print(\"Loading the model from " + outputDir + "...\")\n") + val tmpSum = "tmp_sum_var" + Math.abs((new Random()).nextInt()) + if (eager) + dmlScript.append(tmpSum + " = 0\n") + for (varName <- modelVariables) { + dmlScript.append(varName + " = read(\"" + outputDir + sep + varName + ".mtx\")\n") + if (eager) + dmlScript.append(tmpSum + " = " + tmpSum + " + 0.001*mean(" + varName + ")\n") + } + if (eager) { + dmlScript.append("if(" + tmpSum + " > 0) { print(\"Loaded the model\"); } else { print(\"Loaded the model.\"); }") + } + val script = dml(dmlScript.toString) + for (varName <- modelVariables) { + script.out(varName) + } + val ml = new MLContext(sc) + baseEstimator.mloutput = ml.execute(script) + } + def save(sc: JavaSparkContext, outputDir: String, format: String = "binary", sep: String = "/"): Unit = { + if (baseEstimator.mloutput == null) throw new DMLRuntimeException("Cannot save as you need to train the model first using fit") + val dmlScript = new StringBuilder + dmlScript.append("print(\"Saving the model to " + outputDir + "...\")\n") + for (varName <- modelVariables) { + dmlScript.append("write(" + varName + ", \"" + outputDir + sep + varName + ".mtx\", format=\"" + format + "\")\n") + } + val script = dml(dmlScript.toString) + for (varName <- modelVariables) { + script.in(varName, baseEstimator.mloutput.getMatrix(varName)) + } + val ml = new MLContext(sc) + ml.execute(script) } - def save(sc:JavaSparkContext, outputDir:String, format:String="binary", sep:String="/"):Unit = { - if(baseEstimator.mloutput == null) throw new DMLRuntimeException("Cannot save as you need to train the model first using fit") - val dmlScript = new StringBuilder - dmlScript.append("print(\"Saving the model to " + outputDir + "...\")\n") - for(varName <- modelVariables) { - dmlScript.append("write(" + varName + ", \"" + outputDir + sep + varName + ".mtx\", format=\"" + format + "\")\n") - } - val script = dml(dmlScript.toString) - for(varName <- modelVariables) { - script.in(varName, baseEstimator.mloutput.getMatrix(varName)) - } - val ml = new MLContext(sc) - ml.execute(script) - } } trait BaseSystemMLClassifier extends BaseSystemMLEstimator { def baseFit(X_mb: MatrixBlock, y_mb: MatrixBlock, sc: SparkContext): MLResults = { val isSingleNode = true - val ml = new MLContext(sc) + val ml = new MLContext(sc) updateML(ml) y_mb.recomputeNonZeros(); - val ret = getTrainingScript(isSingleNode) + val ret = getTrainingScript(isSingleNode) val script = ret._1.in(ret._2, X_mb).in(ret._3, y_mb) ml.execute(script) } def baseFit(df: ScriptsUtils.SparkDataType, sc: SparkContext): MLResults = { val isSingleNode = false - val ml = new MLContext(df.rdd.sparkContext) + val ml = new MLContext(df.rdd.sparkContext) updateML(ml) - val mcXin = new MatrixCharacteristics() - val Xin = RDDConverterUtils.dataFrameToBinaryBlock(sc, df.asInstanceOf[DataFrame].select("features"), mcXin, false, true) + val mcXin = new MatrixCharacteristics() + val Xin = RDDConverterUtils.dataFrameToBinaryBlock(sc, df.asInstanceOf[DataFrame].select("features"), mcXin, false, true) val revLabelMapping = new java.util.HashMap[Int, String] - val yin = df.select("label") - val ret = getTrainingScript(isSingleNode) - val mmXin = new MatrixMetadata(mcXin) - val Xbin = new Matrix(Xin, mmXin) - val script = ret._1.in(ret._2, Xbin).in(ret._3, yin) + val yin = df.select("label") + val ret = getTrainingScript(isSingleNode) + val mmXin = new MatrixMetadata(mcXin) + val Xbin = new Matrix(Xin, mmXin) + val script = ret._1.in(ret._2, Xbin).in(ret._3, yin) ml.execute(script) } } trait BaseSystemMLClassifierModel extends BaseSystemMLEstimatorModel { - def baseTransform(X: MatrixBlock, sc: SparkContext, probVar:String): MatrixBlock = baseTransform(X, sc, probVar, -1, 1, 1) - - def baseTransform(X: MatrixBlock, sc: SparkContext, probVar:String, C:Int, H: Int, W:Int): MatrixBlock = { + def baseTransform(X: MatrixBlock, sc: SparkContext, probVar: String): MatrixBlock = baseTransform(X, sc, probVar, -1, 1, 1) + + def baseTransform(X: MatrixBlock, sc: SparkContext, probVar: String, C: Int, H: Int, W: Int): MatrixBlock = { val Prob = baseTransformHelper(X, sc, probVar, C, H, W) val script1 = dml("source(\"nn/util.dml\") as util; Prediction = util::predict_class(Prob, C, H, W);") - .out("Prediction").in("Prob", Prob.toMatrixBlock, Prob.getMatrixMetadata).in("C", C).in("H", H).in("W", W) + .out("Prediction") + .in("Prob", Prob.toMatrixBlock, Prob.getMatrixMetadata) + .in("C", C) + .in("H", H) + .in("W", W) val ret = (new MLContext(sc)).execute(script1).getMatrix("Prediction").toMatrixBlock - - if(ret.getNumColumns != 1 && H == 1 && W == 1) { + + if (ret.getNumColumns != 1 && H == 1 && W == 1) { throw new RuntimeException("Expected predicted label to be a column vector") } return ret } - - def baseTransformHelper(X: MatrixBlock, sc: SparkContext, probVar:String, C:Int, H: Int, W:Int): Matrix = { - val isSingleNode = true - val ml = new MLContext(sc) + + def baseTransformHelper(X: MatrixBlock, sc: SparkContext, probVar: String, C: Int, H: Int, W: Int): Matrix = { + val isSingleNode = true + val ml = new MLContext(sc) updateML(ml) val script = getPredictionScript(isSingleNode) // Uncomment for debugging // ml.setExplainLevel(ExplainLevel.RECOMPILE_RUNTIME) val modelPredict = ml.execute(script._1.in(script._2, X, new MatrixMetadata(X.getNumRows, X.getNumColumns, X.getNonZeros))) return modelPredict.getMatrix(probVar) - } - - def baseTransformProbability(X: MatrixBlock, sc: SparkContext, probVar:String): MatrixBlock = { - baseTransformProbability(X, sc, probVar, -1, 1, 1) - } - - def baseTransformProbability(X: MatrixBlock, sc: SparkContext, probVar:String, C:Int, H: Int, W:Int): MatrixBlock = { - return baseTransformHelper(X, sc, probVar, C, H, W).toMatrixBlock - } - - - def baseTransform(df: ScriptsUtils.SparkDataType, sc: SparkContext, - probVar:String, outputProb:Boolean=true): DataFrame = { - baseTransform(df, sc, probVar, outputProb, -1, 1, 1) - } - - def baseTransformHelper(df: ScriptsUtils.SparkDataType, sc: SparkContext, - probVar:String, outputProb:Boolean, C:Int, H: Int, W:Int): Matrix = { - val isSingleNode = false - val ml = new MLContext(sc) + } + + def baseTransformProbability(X: MatrixBlock, sc: SparkContext, probVar: String): MatrixBlock = + baseTransformProbability(X, sc, probVar, -1, 1, 1) + + def baseTransformProbability(X: MatrixBlock, sc: SparkContext, probVar: String, C: Int, H: Int, W: Int): MatrixBlock = + return baseTransformHelper(X, sc, probVar, C, H, W).toMatrixBlock + + def baseTransform(df: ScriptsUtils.SparkDataType, sc: SparkContext, probVar: String, outputProb: Boolean = true): DataFrame = + baseTransform(df, sc, probVar, outputProb, -1, 1, 1) + + def baseTransformHelper(df: ScriptsUtils.SparkDataType, sc: SparkContext, probVar: String, outputProb: Boolean, C: Int, H: Int, W: Int): Matrix = { + val isSingleNode = false + val ml = new MLContext(sc) updateML(ml) - val mcXin = new MatrixCharacteristics() - val Xin = RDDConverterUtils.dataFrameToBinaryBlock(df.rdd.sparkContext, df.asInstanceOf[DataFrame].select("features"), mcXin, false, true) - val script = getPredictionScript(isSingleNode) - val mmXin = new MatrixMetadata(mcXin) - val Xin_bin = new Matrix(Xin, mmXin) + val mcXin = new MatrixCharacteristics() + val Xin = RDDConverterUtils.dataFrameToBinaryBlock(df.rdd.sparkContext, df.asInstanceOf[DataFrame].select("features"), mcXin, false, true) + val script = getPredictionScript(isSingleNode) + val mmXin = new MatrixMetadata(mcXin) + val Xin_bin = new Matrix(Xin, mmXin) val modelPredict = ml.execute(script._1.in(script._2, Xin_bin)) return modelPredict.getMatrix(probVar) - } + } - def baseTransform(df: ScriptsUtils.SparkDataType, sc: SparkContext, - probVar:String, outputProb:Boolean, C:Int, H: Int, W:Int): DataFrame = { + def baseTransform(df: ScriptsUtils.SparkDataType, sc: SparkContext, probVar: String, outputProb: Boolean, C: Int, H: Int, W: Int): DataFrame = { val Prob = baseTransformHelper(df, sc, probVar, outputProb, C, H, W) val script1 = dml("source(\"nn/util.dml\") as util; Prediction = util::predict_class(Prob, C, H, W);") - .out("Prediction").in("Prob", Prob).in("C", C).in("H", H).in("W", W) + .out("Prediction") + .in("Prob", Prob) + .in("C", C) + .in("H", H) + .in("W", W) val predLabelOut = (new MLContext(sc)).execute(script1) - val predictedDF = predLabelOut.getDataFrame("Prediction").select(RDDConverterUtils.DF_ID_COLUMN, "C1").withColumnRenamed("C1", "prediction") - - if(outputProb) { - val prob = Prob.toDFVectorWithIDColumn().withColumnRenamed("C1", "probability").select(RDDConverterUtils.DF_ID_COLUMN, "probability") + val predictedDF = predLabelOut.getDataFrame("Prediction").select(RDDConverterUtils.DF_ID_COLUMN, "C1").withColumnRenamed("C1", "prediction") + + if (outputProb) { + val prob = Prob.toDFVectorWithIDColumn().withColumnRenamed("C1", "probability").select(RDDConverterUtils.DF_ID_COLUMN, "probability") val dataset = RDDConverterUtilsExt.addIDToDataFrame(df.asInstanceOf[DataFrame], df.sparkSession, RDDConverterUtils.DF_ID_COLUMN) return PredictionUtils.joinUsingID(dataset, PredictionUtils.joinUsingID(prob, predictedDF)) - } - else { + } else { val dataset = RDDConverterUtilsExt.addIDToDataFrame(df.asInstanceOf[DataFrame], df.sparkSession, RDDConverterUtils.DF_ID_COLUMN) return PredictionUtils.joinUsingID(dataset, predictedDF) } - + } } http://git-wip-us.apache.org/repos/asf/systemml/blob/f07b5a2d/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala index 5610bf3..d94655b 100644 --- a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala +++ b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,71 +22,71 @@ package org.apache.sysml.api.ml import org.apache.spark.rdd.RDD import java.io.File import org.apache.spark.SparkContext -import org.apache.spark.ml.{ Model, Estimator } +import org.apache.spark.ml.{ Estimator, Model } import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.StructType -import org.apache.spark.ml.param.{ Params, Param, ParamMap, DoubleParam } +import org.apache.spark.ml.param.{ DoubleParam, Param, ParamMap, Params } import org.apache.sysml.runtime.matrix.MatrixCharacteristics import org.apache.sysml.runtime.matrix.data.MatrixBlock import org.apache.sysml.runtime.DMLRuntimeException -import org.apache.sysml.runtime.instructions.spark.utils.{ RDDConverterUtilsExt, RDDConverterUtils } +import org.apache.sysml.runtime.instructions.spark.utils.{ RDDConverterUtils, RDDConverterUtilsExt } import org.apache.sysml.api.mlcontext._ import org.apache.sysml.api.mlcontext.ScriptFactory._ trait BaseSystemMLRegressor extends BaseSystemMLEstimator { - + def baseFit(X_mb: MatrixBlock, y_mb: MatrixBlock, sc: SparkContext): MLResults = { val isSingleNode = true - val ml = new MLContext(sc) + val ml = new MLContext(sc) updateML(ml) - val ret = getTrainingScript(isSingleNode) + val ret = getTrainingScript(isSingleNode) val script = ret._1.in(ret._2, X_mb).in(ret._3, y_mb) ml.execute(script) } - + def baseFit(df: ScriptsUtils.SparkDataType, sc: SparkContext): MLResults = { val isSingleNode = false - val ml = new MLContext(df.rdd.sparkContext) + val ml = new MLContext(df.rdd.sparkContext) updateML(ml) - val mcXin = new MatrixCharacteristics() - val Xin = RDDConverterUtils.dataFrameToBinaryBlock(sc, df.asInstanceOf[DataFrame], mcXin, false, true) - val yin = df.select("label") - val ret = getTrainingScript(isSingleNode) - val mmXin = new MatrixMetadata(mcXin) - val Xbin = new Matrix(Xin, mmXin) + val mcXin = new MatrixCharacteristics() + val Xin = RDDConverterUtils.dataFrameToBinaryBlock(sc, df.asInstanceOf[DataFrame], mcXin, false, true) + val yin = df.select("label") + val ret = getTrainingScript(isSingleNode) + val mmXin = new MatrixMetadata(mcXin) + val Xbin = new Matrix(Xin, mmXin) val script = ret._1.in(ret._2, Xbin).in(ret._3, yin) ml.execute(script) } } trait BaseSystemMLRegressorModel extends BaseSystemMLEstimatorModel { - - def baseTransform(X: MatrixBlock, sc: SparkContext, predictionVar:String): MatrixBlock = { + + def baseTransform(X: MatrixBlock, sc: SparkContext, predictionVar: String): MatrixBlock = { val isSingleNode = true - val ml = new MLContext(sc) + val ml = new MLContext(sc) updateML(ml) - val script = getPredictionScript(isSingleNode) + val script = getPredictionScript(isSingleNode) val modelPredict = ml.execute(script._1.in(script._2, X)) - val ret = modelPredict.getMatrix(predictionVar).toMatrixBlock - - if(ret.getNumColumns != 1) { + val ret = modelPredict.getMatrix(predictionVar).toMatrixBlock + + if (ret.getNumColumns != 1) { throw new RuntimeException("Expected prediction to be a column vector") } return ret } - - def baseTransform(df: ScriptsUtils.SparkDataType, sc: SparkContext, predictionVar:String): DataFrame = { + + def baseTransform(df: ScriptsUtils.SparkDataType, sc: SparkContext, predictionVar: String): DataFrame = { val isSingleNode = false - val ml = new MLContext(sc) + val ml = new MLContext(sc) updateML(ml) - val mcXin = new MatrixCharacteristics() - val Xin = RDDConverterUtils.dataFrameToBinaryBlock(df.rdd.sparkContext, df.asInstanceOf[DataFrame], mcXin, false, true) - val script = getPredictionScript(isSingleNode) - val mmXin = new MatrixMetadata(mcXin) - val Xin_bin = new Matrix(Xin, mmXin) + val mcXin = new MatrixCharacteristics() + val Xin = RDDConverterUtils.dataFrameToBinaryBlock(df.rdd.sparkContext, df.asInstanceOf[DataFrame], mcXin, false, true) + val script = getPredictionScript(isSingleNode) + val mmXin = new MatrixMetadata(mcXin) + val Xin_bin = new Matrix(Xin, mmXin) val modelPredict = ml.execute(script._1.in(script._2, Xin_bin)) - val predictedDF = modelPredict.getDataFrame(predictionVar).select(RDDConverterUtils.DF_ID_COLUMN, "C1").withColumnRenamed("C1", "prediction") - val dataset = RDDConverterUtilsExt.addIDToDataFrame(df.asInstanceOf[DataFrame], df.sparkSession, RDDConverterUtils.DF_ID_COLUMN) + val predictedDF = modelPredict.getDataFrame(predictionVar).select(RDDConverterUtils.DF_ID_COLUMN, "C1").withColumnRenamed("C1", "prediction") + val dataset = RDDConverterUtilsExt.addIDToDataFrame(df.asInstanceOf[DataFrame], df.sparkSession, RDDConverterUtils.DF_ID_COLUMN) return PredictionUtils.joinUsingID(dataset, predictedDF) } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/systemml/blob/f07b5a2d/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala b/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala index b7634d7..b6f4966 100644 --- a/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala +++ b/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,10 +22,10 @@ package org.apache.sysml.api.ml import org.apache.spark.rdd.RDD import java.io.File import org.apache.spark.SparkContext -import org.apache.spark.ml.{ Model, Estimator } +import org.apache.spark.ml.{ Estimator, Model } import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.StructType -import org.apache.spark.ml.param.{ Params, Param, ParamMap, DoubleParam } +import org.apache.spark.ml.param.{ DoubleParam, Param, ParamMap, Params } import org.apache.sysml.runtime.matrix.MatrixCharacteristics import org.apache.sysml.runtime.matrix.data.MatrixBlock import org.apache.sysml.runtime.DMLRuntimeException @@ -39,28 +39,32 @@ object LinearRegression { } // algorithm = "direct-solve", "conjugate-gradient" -class LinearRegression(override val uid: String, val sc: SparkContext, val solver:String="direct-solve") - extends Estimator[LinearRegressionModel] with HasIcpt - with HasRegParam with HasTol with HasMaxOuterIter with BaseSystemMLRegressor { - - def setIcpt(value: Int) = set(icpt, value) - def setMaxIter(value: Int) = set(maxOuterIter, value) +class LinearRegression(override val uid: String, val sc: SparkContext, val solver: String = "direct-solve") + extends Estimator[LinearRegressionModel] + with HasIcpt + with HasRegParam + with HasTol + with HasMaxOuterIter + with BaseSystemMLRegressor { + + def setIcpt(value: Int) = set(icpt, value) + def setMaxIter(value: Int) = set(maxOuterIter, value) def setRegParam(value: Double) = set(regParam, value) - def setTol(value: Double) = set(tol, value) - + def setTol(value: Double) = set(tol, value) override def copy(extra: ParamMap): Estimator[LinearRegressionModel] = { val that = new LinearRegression(uid, sc, solver) copyValues(that, extra) } - - - def getTrainingScript(isSingleNode:Boolean):(Script, String, String) = { - val script = dml(ScriptsUtils.getDMLScript( - if(solver.compareTo("direct-solve") == 0) LinearRegression.scriptPathDS - else if(solver.compareTo("newton-cg") == 0) LinearRegression.scriptPathCG - else throw new DMLRuntimeException("The algorithm should be direct-solve or newton-cg"))) - .in("$X", " ") + + def getTrainingScript(isSingleNode: Boolean): (Script, String, String) = { + val script = dml( + ScriptsUtils.getDMLScript( + if (solver.compareTo("direct-solve") == 0) LinearRegression.scriptPathDS + else if (solver.compareTo("newton-cg") == 0) LinearRegression.scriptPathCG + else throw new DMLRuntimeException("The algorithm should be direct-solve or newton-cg") + ) + ).in("$X", " ") .in("$Y", " ") .in("$B", " ") .in("$Log", " ") @@ -72,41 +76,46 @@ class LinearRegression(override val uid: String, val sc: SparkContext, val solve .out("beta_out") (script, "X", "y") } - - def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): LinearRegressionModel = { + + def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): LinearRegressionModel = { mloutput = baseFit(X_mb, y_mb, sc) new LinearRegressionModel(this) } - - def fit(df: ScriptsUtils.SparkDataType): LinearRegressionModel = { + + def fit(df: ScriptsUtils.SparkDataType): LinearRegressionModel = { mloutput = baseFit(df, sc) new LinearRegressionModel(this) } - + } -class LinearRegressionModel(override val uid: String)(estimator:LinearRegression, val sc: SparkContext) extends Model[LinearRegressionModel] with HasIcpt - with HasRegParam with HasTol with HasMaxOuterIter with BaseSystemMLRegressorModel { +class LinearRegressionModel(override val uid: String)(estimator: LinearRegression, val sc: SparkContext) + extends Model[LinearRegressionModel] + with HasIcpt + with HasRegParam + with HasTol + with HasMaxOuterIter + with BaseSystemMLRegressorModel { override def copy(extra: ParamMap): LinearRegressionModel = { val that = new LinearRegressionModel(uid)(estimator, sc) copyValues(that, extra) } - + def transform_probability(X: MatrixBlock): MatrixBlock = throw new DMLRuntimeException("Unsupported method") - - def baseEstimator():BaseSystemMLEstimator = estimator - - def this(estimator:LinearRegression) = { - this("model")(estimator, estimator.sc) + + def baseEstimator(): BaseSystemMLEstimator = estimator + + def this(estimator: LinearRegression) = { + this("model")(estimator, estimator.sc) } - - def getPredictionScript(isSingleNode:Boolean): (Script, String) = + + def getPredictionScript(isSingleNode: Boolean): (Script, String) = PredictionUtils.getGLMPredictionScript(estimator.mloutput.getMatrix("beta_out"), isSingleNode) - - def modelVariables():List[String] = List[String]("beta_out") - + + def modelVariables(): List[String] = List[String]("beta_out") + def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, sc, "means") - - def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, sc, "means") - -} \ No newline at end of file + + def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, sc, "means") + +} http://git-wip-us.apache.org/repos/asf/systemml/blob/f07b5a2d/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala b/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala index b04acd1..98b6dd4 100644 --- a/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala +++ b/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,10 +22,10 @@ package org.apache.sysml.api.ml import org.apache.spark.rdd.RDD import java.io.File import org.apache.spark.SparkContext -import org.apache.spark.ml.{ Model, Estimator } +import org.apache.spark.ml.{ Estimator, Model } import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.StructType -import org.apache.spark.ml.param.{ Params, Param, ParamMap, DoubleParam } +import org.apache.spark.ml.param.{ DoubleParam, Param, ParamMap, Params } import org.apache.sysml.runtime.matrix.MatrixCharacteristics import org.apache.sysml.runtime.matrix.data.MatrixBlock import org.apache.sysml.runtime.DMLRuntimeException @@ -38,36 +38,40 @@ object LogisticRegression { } /** - * Logistic Regression Scala API - */ -class LogisticRegression(override val uid: String, val sc: SparkContext) extends Estimator[LogisticRegressionModel] with HasIcpt - with HasRegParam with HasTol with HasMaxOuterIter with HasMaxInnerIter with BaseSystemMLClassifier { + * Logistic Regression Scala API + */ +class LogisticRegression(override val uid: String, val sc: SparkContext) + extends Estimator[LogisticRegressionModel] + with HasIcpt + with HasRegParam + with HasTol + with HasMaxOuterIter + with HasMaxInnerIter + with BaseSystemMLClassifier { - def setIcpt(value: Int) = set(icpt, value) + def setIcpt(value: Int) = set(icpt, value) def setMaxOuterIter(value: Int) = set(maxOuterIter, value) def setMaxInnerIter(value: Int) = set(maxInnerIter, value) - def setRegParam(value: Double) = set(regParam, value) - def setTol(value: Double) = set(tol, value) + def setRegParam(value: Double) = set(regParam, value) + def setTol(value: Double) = set(tol, value) override def copy(extra: ParamMap): LogisticRegression = { val that = new LogisticRegression(uid, sc) copyValues(that, extra) } - // Note: will update the y_mb as this will be called by Python mllearn def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): LogisticRegressionModel = { mloutput = baseFit(X_mb, y_mb, sc) new LogisticRegressionModel(this) } - + def fit(df: ScriptsUtils.SparkDataType): LogisticRegressionModel = { mloutput = baseFit(df, sc) new LogisticRegressionModel(this) } - - - def getTrainingScript(isSingleNode:Boolean):(Script, String, String) = { + + def getTrainingScript(isSingleNode: Boolean): (Script, String, String) = { val script = dml(ScriptsUtils.getDMLScript(LogisticRegression.scriptPath)) .in("$X", " ") .in("$Y", " ") @@ -86,36 +90,39 @@ object LogisticRegressionModel { } /** - * Logistic Regression Scala API - */ - -class LogisticRegressionModel(override val uid: String)( - estimator: LogisticRegression, val sc: SparkContext) - extends Model[LogisticRegressionModel] with HasIcpt - with HasRegParam with HasTol with HasMaxOuterIter with HasMaxInnerIter with BaseSystemMLClassifierModel { + * Logistic Regression Scala API + */ +class LogisticRegressionModel(override val uid: String)(estimator: LogisticRegression, val sc: SparkContext) + extends Model[LogisticRegressionModel] + with HasIcpt + with HasRegParam + with HasTol + with HasMaxOuterIter + with HasMaxInnerIter + with BaseSystemMLClassifierModel { override def copy(extra: ParamMap): LogisticRegressionModel = { val that = new LogisticRegressionModel(uid)(estimator, sc) copyValues(that, extra) } - var outputRawPredictions = true - def setOutputRawPredictions(outRawPred:Boolean): Unit = { outputRawPredictions = outRawPred } - def this(estimator:LogisticRegression) = { - this("model")(estimator, estimator.sc) + var outputRawPredictions = true + def setOutputRawPredictions(outRawPred: Boolean): Unit = outputRawPredictions = outRawPred + def this(estimator: LogisticRegression) = { + this("model")(estimator, estimator.sc) } - def getPredictionScript(isSingleNode:Boolean): (Script, String) = + def getPredictionScript(isSingleNode: Boolean): (Script, String) = PredictionUtils.getGLMPredictionScript(estimator.mloutput.getMatrix("B_out"), isSingleNode, 3) - - def baseEstimator():BaseSystemMLEstimator = estimator - def modelVariables():List[String] = List[String]("B_out") - - def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, sc, "means") - def transform_probability(X: MatrixBlock): MatrixBlock = baseTransformProbability(X, sc, "means") + + def baseEstimator(): BaseSystemMLEstimator = estimator + def modelVariables(): List[String] = List[String]("B_out") + + def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, sc, "means") + def transform_probability(X: MatrixBlock): MatrixBlock = baseTransformProbability(X, sc, "means") def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, sc, "means") } /** - * Example code for Logistic Regression - */ + * Example code for Logistic Regression + */ object LogisticRegressionExample { import org.apache.spark.{ SparkConf, SparkContext } import org.apache.spark.sql._ @@ -124,28 +131,34 @@ object LogisticRegressionExample { import org.apache.spark.ml.feature.LabeledPoint def main(args: Array[String]) = { - val sparkSession = SparkSession.builder().master("local").appName("TestLocal").getOrCreate(); + val sparkSession = SparkSession.builder().master("local").appName("TestLocal").getOrCreate(); val sc: SparkContext = sparkSession.sparkContext; import sparkSession.implicits._ - val training = sc.parallelize(Seq( - LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)), - LabeledPoint(1.0, Vectors.dense(1.0, 0.4, 2.1)), - LabeledPoint(2.0, Vectors.dense(1.2, 0.0, 3.5)), - LabeledPoint(1.0, Vectors.dense(1.0, 0.5, 2.2)), - LabeledPoint(2.0, Vectors.dense(1.6, 0.8, 3.6)), - LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 2.3)))) - val lr = new LogisticRegression("log", sc) + val training = sc.parallelize( + Seq( + LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)), + LabeledPoint(1.0, Vectors.dense(1.0, 0.4, 2.1)), + LabeledPoint(2.0, Vectors.dense(1.2, 0.0, 3.5)), + LabeledPoint(1.0, Vectors.dense(1.0, 0.5, 2.2)), + LabeledPoint(2.0, Vectors.dense(1.6, 0.8, 3.6)), + LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 2.3)) + ) + ) + val lr = new LogisticRegression("log", sc) val lrmodel = lr.fit(training.toDF) // lrmodel.mloutput.getDF(sparkSession, "B_out").show() - val testing = sc.parallelize(Seq( - LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)), - LabeledPoint(1.0, Vectors.dense(1.0, 0.4, 2.1)), - LabeledPoint(2.0, Vectors.dense(1.2, 0.0, 3.5)), - LabeledPoint(1.0, Vectors.dense(1.0, 0.5, 2.2)), - LabeledPoint(2.0, Vectors.dense(1.6, 0.8, 3.6)), - LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 2.3)))) + val testing = sc.parallelize( + Seq( + LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)), + LabeledPoint(1.0, Vectors.dense(1.0, 0.4, 2.1)), + LabeledPoint(2.0, Vectors.dense(1.2, 0.0, 3.5)), + LabeledPoint(1.0, Vectors.dense(1.0, 0.5, 2.2)), + LabeledPoint(2.0, Vectors.dense(1.6, 0.8, 3.6)), + LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 2.3)) + ) + ) lrmodel.transform(testing.toDF).show } http://git-wip-us.apache.org/repos/asf/systemml/blob/f07b5a2d/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala b/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala index 990ab52..8ecd4f0 100644 --- a/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala +++ b/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,10 +22,10 @@ package org.apache.sysml.api.ml import org.apache.spark.rdd.RDD import java.io.File import org.apache.spark.SparkContext -import org.apache.spark.ml.{ Model, Estimator } +import org.apache.spark.ml.{ Estimator, Model } import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.StructType -import org.apache.spark.ml.param.{ Params, Param, ParamMap, DoubleParam } +import org.apache.spark.ml.param.{ DoubleParam, Param, ParamMap, Params } import org.apache.sysml.runtime.matrix.MatrixCharacteristics import org.apache.sysml.runtime.matrix.data.MatrixBlock import org.apache.sysml.runtime.DMLRuntimeException @@ -43,19 +43,19 @@ class NaiveBayes(override val uid: String, val sc: SparkContext) extends Estimat copyValues(that, extra) } def setLaplace(value: Double) = set(laplace, value) - + // Note: will update the y_mb as this will be called by Python mllearn def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): NaiveBayesModel = { mloutput = baseFit(X_mb, y_mb, sc) new NaiveBayesModel(this) } - + def fit(df: ScriptsUtils.SparkDataType): NaiveBayesModel = { mloutput = baseFit(df, sc) new NaiveBayesModel(this) } - - def getTrainingScript(isSingleNode:Boolean):(Script, String, String) = { + + def getTrainingScript(isSingleNode: Boolean): (Script, String, String) = { val script = dml(ScriptsUtils.getDMLScript(NaiveBayes.scriptPath)) .in("$X", " ") .in("$Y", " ") @@ -68,49 +68,47 @@ class NaiveBayes(override val uid: String, val sc: SparkContext) extends Estimat } } - object NaiveBayesModel { final val scriptPath = "scripts" + File.separator + "algorithms" + File.separator + "naive-bayes-predict.dml" } -class NaiveBayesModel(override val uid: String) - (estimator:NaiveBayes, val sc: SparkContext) - extends Model[NaiveBayesModel] with HasLaplace with BaseSystemMLClassifierModel { - - def this(estimator:NaiveBayes) = { +class NaiveBayesModel(override val uid: String)(estimator: NaiveBayes, val sc: SparkContext) extends Model[NaiveBayesModel] with HasLaplace with BaseSystemMLClassifierModel { + + def this(estimator: NaiveBayes) = { this("model")(estimator, estimator.sc) } - + override def copy(extra: ParamMap): NaiveBayesModel = { val that = new NaiveBayesModel(uid)(estimator, sc) copyValues(that, extra) } - - def modelVariables():List[String] = List[String]("classPrior", "classConditionals") - def getPredictionScript(isSingleNode:Boolean): (Script, String) = { + + def modelVariables(): List[String] = List[String]("classPrior", "classConditionals") + def getPredictionScript(isSingleNode: Boolean): (Script, String) = { val script = dml(ScriptsUtils.getDMLScript(NaiveBayesModel.scriptPath)) .in("$X", " ") .in("$prior", " ") .in("$conditionals", " ") .in("$probabilities", " ") .out("probs") - - val classPrior = estimator.mloutput.getMatrix("classPrior") + + val classPrior = estimator.mloutput.getMatrix("classPrior") val classConditionals = estimator.mloutput.getMatrix("classConditionals") - val ret = if(isSingleNode) { - script.in("prior", classPrior.toMatrixBlock, classPrior.getMatrixMetadata) - .in("conditionals", classConditionals.toMatrixBlock, classConditionals.getMatrixMetadata) - } - else { - script.in("prior", classPrior.toBinaryBlocks, classPrior.getMatrixMetadata) - .in("conditionals", classConditionals.toBinaryBlocks, classConditionals.getMatrixMetadata) + val ret = if (isSingleNode) { + script + .in("prior", classPrior.toMatrixBlock, classPrior.getMatrixMetadata) + .in("conditionals", classConditionals.toMatrixBlock, classConditionals.getMatrixMetadata) + } else { + script + .in("prior", classPrior.toBinaryBlocks, classPrior.getMatrixMetadata) + .in("conditionals", classConditionals.toBinaryBlocks, classConditionals.getMatrixMetadata) } (ret, "D") } - - def baseEstimator():BaseSystemMLEstimator = estimator - def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, sc, "probs") - def transform_probability(X: MatrixBlock): MatrixBlock = baseTransformProbability(X, sc, "probs") + + def baseEstimator(): BaseSystemMLEstimator = estimator + def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, sc, "probs") + def transform_probability(X: MatrixBlock): MatrixBlock = baseTransformProbability(X, sc, "probs") def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, sc, "probs") - + } http://git-wip-us.apache.org/repos/asf/systemml/blob/f07b5a2d/src/main/scala/org/apache/sysml/api/ml/PredictionUtils.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/PredictionUtils.scala b/src/main/scala/org/apache/sysml/api/ml/PredictionUtils.scala index 3406169..72e82e8 100644 --- a/src/main/scala/org/apache/sysml/api/ml/PredictionUtils.scala +++ b/src/main/scala/org/apache/sysml/api/ml/PredictionUtils.scala @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -33,39 +33,35 @@ import org.apache.sysml.api.mlcontext.Script import org.apache.sysml.api.mlcontext.Matrix object PredictionUtils { - - def getGLMPredictionScript(B_full: Matrix, isSingleNode:Boolean, dfam:java.lang.Integer=1): (Script, String) = { + + def getGLMPredictionScript(B_full: Matrix, isSingleNode: Boolean, dfam: java.lang.Integer = 1): (Script, String) = { val script = dml(ScriptsUtils.getDMLScript(LogisticRegressionModel.scriptPath)) .in("$X", " ") .in("$B", " ") .in("$dfam", dfam) .out("means") - val ret = if(isSingleNode) { + val ret = if (isSingleNode) { script.in("B_full", B_full.toMatrixBlock, B_full.getMatrixMetadata) - } - else { + } else { script.in("B_full", B_full) } (ret, "X") } - - def joinUsingID(df1:DataFrame, df2:DataFrame):DataFrame = { + + def joinUsingID(df1: DataFrame, df2: DataFrame): DataFrame = df1.join(df2, RDDConverterUtils.DF_ID_COLUMN) - } - - def computePredictedClassLabelsFromProbability(mlscoreoutput:MLResults, isSingleNode:Boolean, sc:SparkContext, inProbVar:String): MLResults = { - val ml = new org.apache.sysml.api.mlcontext.MLContext(sc) - val script = dml( - """ + + def computePredictedClassLabelsFromProbability(mlscoreoutput: MLResults, isSingleNode: Boolean, sc: SparkContext, inProbVar: String): MLResults = { + val ml = new org.apache.sysml.api.mlcontext.MLContext(sc) + val script = dml(""" Prob = read("temp1"); Prediction = rowIndexMax(Prob); # assuming one-based label mapping write(Prediction, "tempOut", "csv"); """).out("Prediction") val probVar = mlscoreoutput.getMatrix(inProbVar) - if(isSingleNode) { + if (isSingleNode) { ml.execute(script.in("Prob", probVar.toMatrixBlock, probVar.getMatrixMetadata)) - } - else { + } else { ml.execute(script.in("Prob", probVar)) } } http://git-wip-us.apache.org/repos/asf/systemml/blob/f07b5a2d/src/main/scala/org/apache/sysml/api/ml/SVM.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/SVM.scala b/src/main/scala/org/apache/sysml/api/ml/SVM.scala index 9107836..2013385 100644 --- a/src/main/scala/org/apache/sysml/api/ml/SVM.scala +++ b/src/main/scala/org/apache/sysml/api/ml/SVM.scala @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -22,10 +22,10 @@ package org.apache.sysml.api.ml import org.apache.spark.rdd.RDD import java.io.File import org.apache.spark.SparkContext -import org.apache.spark.ml.{ Model, Estimator } +import org.apache.spark.ml.{ Estimator, Model } import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.StructType -import org.apache.spark.ml.param.{ Params, Param, ParamMap, DoubleParam } +import org.apache.spark.ml.param.{ DoubleParam, Param, ParamMap, Params } import org.apache.sysml.runtime.matrix.MatrixCharacteristics import org.apache.sysml.runtime.matrix.data.MatrixBlock import org.apache.sysml.runtime.DMLRuntimeException @@ -34,25 +34,30 @@ import org.apache.sysml.api.mlcontext._ import org.apache.sysml.api.mlcontext.ScriptFactory._ object SVM { - final val scriptPathBinary = "scripts" + File.separator + "algorithms" + File.separator + "l2-svm.dml" + final val scriptPathBinary = "scripts" + File.separator + "algorithms" + File.separator + "l2-svm.dml" final val scriptPathMulticlass = "scripts" + File.separator + "algorithms" + File.separator + "m-svm.dml" } -class SVM (override val uid: String, val sc: SparkContext, val isMultiClass:Boolean=false) extends Estimator[SVMModel] with HasIcpt - with HasRegParam with HasTol with HasMaxOuterIter with BaseSystemMLClassifier { +class SVM(override val uid: String, val sc: SparkContext, val isMultiClass: Boolean = false) + extends Estimator[SVMModel] + with HasIcpt + with HasRegParam + with HasTol + with HasMaxOuterIter + with BaseSystemMLClassifier { - def setIcpt(value: Int) = set(icpt, value) - def setMaxIter(value: Int) = set(maxOuterIter, value) + def setIcpt(value: Int) = set(icpt, value) + def setMaxIter(value: Int) = set(maxOuterIter, value) def setRegParam(value: Double) = set(regParam, value) - def setTol(value: Double) = set(tol, value) - + def setTol(value: Double) = set(tol, value) + override def copy(extra: ParamMap): Estimator[SVMModel] = { val that = new SVM(uid, sc, isMultiClass) copyValues(that, extra) } - - def getTrainingScript(isSingleNode:Boolean):(Script, String, String) = { - val script = dml(ScriptsUtils.getDMLScript(if(isMultiClass) SVM.scriptPathMulticlass else SVM.scriptPathBinary)) + + def getTrainingScript(isSingleNode: Boolean): (Script, String, String) = { + val script = dml(ScriptsUtils.getDMLScript(if (isMultiClass) SVM.scriptPathMulticlass else SVM.scriptPathBinary)) .in("$X", " ") .in("$Y", " ") .in("$model", " ") @@ -64,58 +69,56 @@ class SVM (override val uid: String, val sc: SparkContext, val isMultiClass:Bool .out("w") (script, "X", "Y") } - + // Note: will update the y_mb as this will be called by Python mllearn def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): SVMModel = { mloutput = baseFit(X_mb, y_mb, sc) new SVMModel(this, isMultiClass) } - + def fit(df: ScriptsUtils.SparkDataType): SVMModel = { mloutput = baseFit(df, sc) new SVMModel(this, isMultiClass) } - + } object SVMModel { - final val predictionScriptPathBinary = "scripts" + File.separator + "algorithms" + File.separator + "l2-svm-predict.dml" + final val predictionScriptPathBinary = "scripts" + File.separator + "algorithms" + File.separator + "l2-svm-predict.dml" final val predictionScriptPathMulticlass = "scripts" + File.separator + "algorithms" + File.separator + "m-svm-predict.dml" } -class SVMModel (override val uid: String)(estimator:SVM, val sc: SparkContext, val isMultiClass:Boolean) - extends Model[SVMModel] with BaseSystemMLClassifierModel { +class SVMModel(override val uid: String)(estimator: SVM, val sc: SparkContext, val isMultiClass: Boolean) extends Model[SVMModel] with BaseSystemMLClassifierModel { override def copy(extra: ParamMap): SVMModel = { val that = new SVMModel(uid)(estimator, sc, isMultiClass) copyValues(that, extra) } - - def this(estimator:SVM, isMultiClass:Boolean) = { - this("model")(estimator, estimator.sc, isMultiClass) + + def this(estimator: SVM, isMultiClass: Boolean) = { + this("model")(estimator, estimator.sc, isMultiClass) } - - def baseEstimator():BaseSystemMLEstimator = estimator - def modelVariables():List[String] = List[String]("w") - - def getPredictionScript(isSingleNode:Boolean): (Script, String) = { - val script = dml(ScriptsUtils.getDMLScript(if(isMultiClass) SVMModel.predictionScriptPathMulticlass else SVMModel.predictionScriptPathBinary)) + + def baseEstimator(): BaseSystemMLEstimator = estimator + def modelVariables(): List[String] = List[String]("w") + + def getPredictionScript(isSingleNode: Boolean): (Script, String) = { + val script = dml(ScriptsUtils.getDMLScript(if (isMultiClass) SVMModel.predictionScriptPathMulticlass else SVMModel.predictionScriptPathBinary)) .in("$X", " ") .in("$model", " ") .out("scores") - - val w = estimator.mloutput.getMatrix("w") - val wVar = if(isMultiClass) "W" else "w" - - val ret = if(isSingleNode) { + + val w = estimator.mloutput.getMatrix("w") + val wVar = if (isMultiClass) "W" else "w" + + val ret = if (isSingleNode) { script.in(wVar, w.toMatrixBlock, w.getMatrixMetadata) - } - else { + } else { script.in(wVar, w) } (ret, "X") } - - def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, sc, "scores") - def transform_probability(X: MatrixBlock): MatrixBlock = baseTransformProbability(X, sc, "scores") + + def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, sc, "scores") + def transform_probability(X: MatrixBlock): MatrixBlock = baseTransformProbability(X, sc, "scores") def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, sc, "scores") } http://git-wip-us.apache.org/repos/asf/systemml/blob/f07b5a2d/src/main/scala/org/apache/sysml/api/ml/ScriptsUtils.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/ScriptsUtils.scala b/src/main/scala/org/apache/sysml/api/ml/ScriptsUtils.scala index 2ba0f2b..016457e 100644 --- a/src/main/scala/org/apache/sysml/api/ml/ScriptsUtils.scala +++ b/src/main/scala/org/apache/sysml/api/ml/ScriptsUtils.scala @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,16 +26,16 @@ import org.apache.sysml.runtime.DMLRuntimeException object ScriptsUtils { var systemmlHome = System.getenv("SYSTEMML_HOME") - + type SparkDataType = org.apache.spark.sql.Dataset[_] // org.apache.spark.sql.DataFrame for Spark 1.x /** - * set SystemML home - */ + * set SystemML home + */ def setSystemmlHome(path: String) { systemmlHome = path } - + /* * Internal function to get dml path */ @@ -49,7 +49,7 @@ object ScriptsUtils { */ private[sysml] def getDMLScript(scriptPath: String): String = { var reader: BufferedReader = null - val out = new StringBuilder() + val out = new StringBuilder() try { val in = { if (systemmlHome == null || systemmlHome.equals("")) { @@ -60,7 +60,7 @@ object ScriptsUtils { } } var reader = new BufferedReader(new InputStreamReader(in)) - var line = reader.readLine() + var line = reader.readLine() while (line != null) { out.append(line); out.append(System.getProperty("line.separator")); @@ -75,4 +75,4 @@ object ScriptsUtils { } out.toString() } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/systemml/blob/f07b5a2d/src/main/scala/org/apache/sysml/api/ml/Utils.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/Utils.scala b/src/main/scala/org/apache/sysml/api/ml/Utils.scala index a804f64..77bd17a 100644 --- a/src/main/scala/org/apache/sysml/api/ml/Utils.scala +++ b/src/main/scala/org/apache/sysml/api/ml/Utils.scala @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -27,18 +27,17 @@ object Utils { val originalErr = System.err } class Utils { - def checkIfFileExists(filePath:String):Boolean = { + def checkIfFileExists(filePath: String): Boolean = return org.apache.sysml.runtime.util.MapReduceTool.existsFileOnHDFS(filePath) - } - + // -------------------------------------------------------------------------------- // Simple utility function to print the information about our binary blocked format - def getBinaryBlockInfo(binaryBlocks:JavaPairRDD[MatrixIndexes, MatrixBlock]):String = { - val sb = new StringBuilder + def getBinaryBlockInfo(binaryBlocks: JavaPairRDD[MatrixIndexes, MatrixBlock]): String = { + val sb = new StringBuilder var partitionIndex = 0 - for(str <- binaryBlocks.rdd.mapPartitions(binaryBlockIteratorToString(_), true).collect) { + for (str <- binaryBlocks.rdd.mapPartitions(binaryBlockIteratorToString(_), true).collect) { sb.append("-------------------------------------\n") - sb.append("Partition " + partitionIndex + ":\n") + sb.append("Partition " + partitionIndex + ":\n") sb.append(str) partitionIndex = partitionIndex + 1 } @@ -47,40 +46,40 @@ class Utils { } def binaryBlockIteratorToString(it: Iterator[(MatrixIndexes, MatrixBlock)]): Iterator[String] = { val sb = new StringBuilder - for(entry <- it) { + for (entry <- it) { val mi = entry._1 val mb = entry._2 sb.append(mi.toString); - sb.append(" sparse? = "); - sb.append(mb.isInSparseFormat()); - if(mb.isUltraSparse) - sb.append(" (ultra-sparse)") - sb.append(", nonzeros = "); - sb.append(mb.getNonZeros); - sb.append(", dimensions = "); - sb.append(mb.getNumRows); - sb.append(" X "); - sb.append(mb.getNumColumns); - sb.append("\n"); + sb.append(" sparse? = "); + sb.append(mb.isInSparseFormat()); + if (mb.isUltraSparse) + sb.append(" (ultra-sparse)") + sb.append(", nonzeros = "); + sb.append(mb.getNonZeros); + sb.append(", dimensions = "); + sb.append(mb.getNumRows); + sb.append(" X "); + sb.append(mb.getNumColumns); + sb.append("\n"); } List[String](sb.toString).iterator } val baos = new java.io.ByteArrayOutputStream() val baes = new java.io.ByteArrayOutputStream() - def startRedirectStdOut():Unit = { + def startRedirectStdOut(): Unit = { System.setOut(new java.io.PrintStream(baos)); System.setErr(new java.io.PrintStream(baes)); } - def flushStdOut():String = { + def flushStdOut(): String = { val ret = baos.toString() + baes.toString() baos.reset(); baes.reset() return ret } - def stopRedirectStdOut():String = { + def stopRedirectStdOut(): String = { val ret = baos.toString() + baes.toString() System.setOut(Utils.originalOut) System.setErr(Utils.originalErr) return ret } // -------------------------------------------------------------------------------- -} \ No newline at end of file +}