Repository: incubator-systemml Updated Branches: refs/heads/master bc2231982 -> 8a6b3856c
[SYSTEMML-578] Refactor ml api package to set Scala version as default Closes #116. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/8a6b3856 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/8a6b3856 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/8a6b3856 Branch: refs/heads/master Commit: 8a6b3856ccbb65ea974e51636a60c078805c258f Parents: bc22319 Author: Tommy YU <tumm...@163.com> Authored: Thu Apr 21 14:31:30 2016 -0700 Committer: Deron Eriksson <de...@us.ibm.com> Committed: Thu Apr 21 14:31:30 2016 -0700 ---------------------------------------------------------------------- .../sysml/api/javaml/LogisticRegression.java | 473 ++++++++++++++++++ .../api/javaml/LogisticRegressionModel.java | 179 +++++++ .../apache/sysml/api/ml/LogisticRegression.java | 474 ------------------- .../sysml/api/ml/LogisticRegressionModel.java | 179 ------- .../sysml/api/ml/LogisticRegression.scala | 204 ++++++++ .../org/apache/sysml/api/ml/ScriptsUtils.scala | 75 +++ .../sysml/api/ml/scala/LogisticRegression.scala | 169 ------- .../sysml/api/ml/scala/ScriptsUtils.scala | 62 --- .../sysml/api/ml/LogisticRegressionSuite.scala | 51 ++ .../sysml/api/ml/WrapperSparkContext.scala | 54 +++ .../api/ml/scala/LogisticRegressionSuite.scala | 50 -- .../api/ml/scala/WrapperSparkContext.scala | 55 --- 12 files changed, 1036 insertions(+), 989 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8a6b3856/src/main/java/org/apache/sysml/api/javaml/LogisticRegression.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/javaml/LogisticRegression.java b/src/main/java/org/apache/sysml/api/javaml/LogisticRegression.java new file mode 100644 index 0000000..dbcc118 --- /dev/null +++ b/src/main/java/org/apache/sysml/api/javaml/LogisticRegression.java @@ -0,0 +1,473 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.api.javaml; + +import java.io.File; +import java.util.HashMap; + +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.ml.classification.LogisticRegressionParams; +import org.apache.spark.ml.classification.ProbabilisticClassifier; +import org.apache.spark.ml.param.BooleanParam; +import org.apache.spark.ml.param.DoubleParam; +import org.apache.spark.ml.param.IntParam; +import org.apache.spark.ml.param.StringArrayParam; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.SQLContext; +import org.apache.sysml.api.MLContext; +import org.apache.sysml.api.MLOutput; +import org.apache.sysml.api.javaml.LogisticRegressionModel; +import org.apache.sysml.api.ml.functions.ConvertSingleColumnToString; +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt; +import org.apache.sysml.runtime.matrix.MatrixCharacteristics; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.data.MatrixIndexes; + +/** + * + * This class shows how SystemML can be integrated into MLPipeline. Note, it has not been optimized for performance and + * is implemented as a proof of concept. An optimized pipeline can be constructed by usage of DML's 'parfor' construct. + * + * TODO: + * - Please note that this class expects 1-based labels. To run below example, + * please set environment variable 'SYSTEMML_HOME' and create folder 'algorithms' + * and place atleast two scripts in that folder 'MultiLogReg.dml' and 'GLM-predict.dml' + * - It is not yet optimized for performance. + * - Also, it needs to be extended to surface all the parameters of MultiLogReg.dml + * + * Example usage: + * <pre><code> + * // Code to demonstrate usage of pipeline + * import org.apache.spark.ml.Pipeline + * import org.apache.sysml.api.ml.LogisticRegression + * import org.apache.spark.ml.feature.{HashingTF, Tokenizer} + * import org.apache.spark.mllib.linalg.Vector + * case class LabeledDocument(id: Long, text: String, label: Double) + * case class Document(id: Long, text: String) + * val training = sc.parallelize(Seq( + * LabeledDocument(0L, "a b c d e spark", 1.0), + * LabeledDocument(1L, "b d", 2.0), + * LabeledDocument(2L, "spark f g h", 1.0), + * LabeledDocument(3L, "hadoop mapreduce", 2.0), + * LabeledDocument(4L, "b spark who", 1.0), + * LabeledDocument(5L, "g d a y", 2.0), + * LabeledDocument(6L, "spark fly", 1.0), + * LabeledDocument(7L, "was mapreduce", 2.0), + * LabeledDocument(8L, "e spark program", 1.0), + * LabeledDocument(9L, "a e c l", 2.0), + * LabeledDocument(10L, "spark compile", 1.0), + * LabeledDocument(11L, "hadoop software", 2.0))) + * val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words") + * val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol("features") + * val lr = new LogisticRegression(sc, sqlContext) + * val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr)) + * val model = pipeline.fit(training.toDF) + * val test = sc.parallelize(Seq( + * Document(12L, "spark i j k"), + * Document(13L, "l m n"), + * Document(14L, "mapreduce spark"), + * Document(15L, "apache hadoop"))) + * model.transform(test.toDF).show + * + * // Code to demonstrate usage of cross-validation + * import org.apache.spark.ml.Pipeline + * import org.apache.sysml.api.ml.LogisticRegression + * import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator + * import org.apache.spark.ml.feature.{HashingTF, Tokenizer} + * import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator} + * import org.apache.spark.mllib.linalg.Vector + * case class LabeledDocument(id: Long, text: String, label: Double) + * case class Document(id: Long, text: String) + * val training = sc.parallelize(Seq( + * LabeledDocument(0L, "a b c d e spark", 1.0), + * LabeledDocument(1L, "b d", 2.0), + * LabeledDocument(2L, "spark f g h", 1.0), + * LabeledDocument(3L, "hadoop mapreduce", 2.0), + * LabeledDocument(4L, "b spark who", 1.0), + * LabeledDocument(5L, "g d a y", 2.0), + * LabeledDocument(6L, "spark fly", 1.0), + * LabeledDocument(7L, "was mapreduce", 2.0), + * LabeledDocument(8L, "e spark program", 1.0), + * LabeledDocument(9L, "a e c l", 2.0), + * LabeledDocument(10L, "spark compile", 1.0), + * LabeledDocument(11L, "hadoop software", 2.0))) + * val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words") + * val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol("features") + * val lr = new LogisticRegression(sc, sqlContext) + * val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr)) + * val crossval = new CrossValidator().setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator) + * val paramGrid = new ParamGridBuilder().addGrid(hashingTF.numFeatures, Array(10, 100, 1000)).addGrid(lr.regParam, Array(0.1, 0.01)).build() + * crossval.setEstimatorParamMaps(paramGrid) + * crossval.setNumFolds(2) + * val cvModel = crossval.fit(training.toDF) + * val test = sc.parallelize(Seq( + * Document(12L, "spark i j k"), + * Document(13L, "l m n"), + * Document(14L, "mapreduce spark"), + * Document(15L, "apache hadoop"))) + * cvModel.transform(test.toDF).show + * </code></pre> + * + */ +public class LogisticRegression extends ProbabilisticClassifier<Vector, LogisticRegression, LogisticRegressionModel> + implements LogisticRegressionParams { + + private static final long serialVersionUID = 7763813395635870734L; + + private SparkContext sc = null; + private SQLContext sqlContext = null; + private HashMap<String, String> cmdLineParams = new HashMap<String, String>(); + + private IntParam icpt = new IntParam(this, "icpt", "Value of intercept"); + private DoubleParam reg = new DoubleParam(this, "reg", "Value of regularization parameter"); + private DoubleParam tol = new DoubleParam(this, "tol", "Value of tolerance"); + private IntParam moi = new IntParam(this, "moi", "Max outer iterations"); + private IntParam mii = new IntParam(this, "mii", "Max inner iterations"); + private IntParam labelIndex = new IntParam(this, "li", "Index of the label column"); + private StringArrayParam inputCol = new StringArrayParam(this, "icname", "Feature column name"); + private StringArrayParam outputCol = new StringArrayParam(this, "ocname", "Label column name"); + private int intMin = Integer.MIN_VALUE; + @SuppressWarnings("unused") + private int li = 0; + private String[] icname = new String[1]; + private String[] ocname = new String[1]; + + public LogisticRegression() { + } + + public LogisticRegression(String uid) { + } + + @Override + public LogisticRegression copy(org.apache.spark.ml.param.ParamMap paramMap) { + try { + // Copy deals with command-line parameter of script MultiLogReg.dml + LogisticRegression lr = new LogisticRegression(sc, sqlContext); + lr.cmdLineParams.put(icpt.name(), paramMap.getOrElse(icpt, 0).toString()); + lr.cmdLineParams.put(reg.name(), paramMap.getOrElse(reg, 0.0f).toString()); + lr.cmdLineParams.put(tol.name(), paramMap.getOrElse(tol, 0.000001f).toString()); + lr.cmdLineParams.put(moi.name(), paramMap.getOrElse(moi, 100).toString()); + lr.cmdLineParams.put(mii.name(), paramMap.getOrElse(mii, 0).toString()); + + return lr; + } catch (DMLRuntimeException e) { + e.printStackTrace(); + } + return null; + + } + + public LogisticRegression(SparkContext sc, SQLContext sqlContext) throws DMLRuntimeException { + this.sc = sc; + this.sqlContext = sqlContext; + + setDefault(intercept(), 0); + cmdLineParams.put(icpt.name(), "0"); + setDefault(regParam(), 0.0f); + cmdLineParams.put(reg.name(), "0.0f"); + setDefault(tol(), 0.000001f); + cmdLineParams.put(tol.name(), "0.000001f"); + setDefault(maxOuterIter(), 100); + cmdLineParams.put(moi.name(), "100"); + setDefault(maxInnerIter(), 0); + cmdLineParams.put(mii.name(), "0"); + setDefault(labelIdx(), intMin); + li = intMin; + setDefault(inputCol(), icname); + icname[0] = ""; + setDefault(outputCol(), ocname); + ocname[0] = ""; + } + + public LogisticRegression(SparkContext sc, SQLContext sqlContext, int icpt, double reg, double tol, int moi, int mii) throws DMLRuntimeException { + this.sc = sc; + this.sqlContext = sqlContext; + + setDefault(intercept(), icpt); + cmdLineParams.put(this.icpt.name(), Integer.toString(icpt)); + setDefault(regParam(), reg); + cmdLineParams.put(this.reg.name(), Double.toString(reg)); + setDefault(tol(), tol); + cmdLineParams.put(this.tol.name(), Double.toString(tol)); + setDefault(maxOuterIter(), moi); + cmdLineParams.put(this.moi.name(), Integer.toString(moi)); + setDefault(maxInnerIter(), mii); + cmdLineParams.put(this.mii.name(), Integer.toString(mii)); + setDefault(labelIdx(), intMin); + li = intMin; + setDefault(inputCol(), icname); + icname[0] = ""; + setDefault(outputCol(), ocname); + ocname[0] = ""; + } + + @Override + public String uid() { + return Long.toString(LogisticRegression.serialVersionUID); + } + + public LogisticRegression setRegParam(double value) { + cmdLineParams.put(reg.name(), Double.toString(value)); + return (LogisticRegression) setDefault(reg, value); + } + + @Override + public org.apache.spark.sql.types.StructType validateAndTransformSchema(org.apache.spark.sql.types.StructType arg0, boolean arg1, org.apache.spark.sql.types.DataType arg2) { + return null; + } + + @Override + public double getRegParam() { + return Double.parseDouble(cmdLineParams.get(reg.name())); + } + + @Override + public void org$apache$spark$ml$param$shared$HasRegParam$_setter_$regParam_$eq(DoubleParam arg0) { + + } + + @Override + public DoubleParam regParam() { + return reg; + } + + @Override + public DoubleParam elasticNetParam() { + return null; + } + + @Override + public double getElasticNetParam() { + return 0.0f; + } + + @Override + public void org$apache$spark$ml$param$shared$HasElasticNetParam$_setter_$elasticNetParam_$eq(DoubleParam arg0) { + + } + + @Override + public int getMaxIter() { + return 0; + } + + @Override + public IntParam maxIter() { + return null; + } + + public LogisticRegression setMaxOuterIter(int value) { + cmdLineParams.put(moi.name(), Integer.toString(value)); + return (LogisticRegression) setDefault(moi, value); + } + + public int getMaxOuterIter() { + return Integer.parseInt(cmdLineParams.get(moi.name())); + } + + public IntParam maxOuterIter() { + return this.moi; + } + + public LogisticRegression setMaxInnerIter(int value) { + cmdLineParams.put(mii.name(), Integer.toString(value)); + return (LogisticRegression) setDefault(mii, value); + } + + public int getMaxInnerIter() { + return Integer.parseInt(cmdLineParams.get(mii.name())); + } + + public IntParam maxInnerIter() { + return mii; + } + + @Override + public void org$apache$spark$ml$param$shared$HasMaxIter$_setter_$maxIter_$eq(IntParam arg0) { + + } + + public LogisticRegression setIntercept(int value) { + cmdLineParams.put(icpt.name(), Integer.toString(value)); + return (LogisticRegression) setDefault(icpt, value); + } + + public int getIntercept() { + return Integer.parseInt(cmdLineParams.get(icpt.name())); + } + + public IntParam intercept() { + return icpt; + } + + @Override + public BooleanParam fitIntercept() { + return null; + } + + @Override + public boolean getFitIntercept() { + return false; + } + + @Override + public void org$apache$spark$ml$param$shared$HasFitIntercept$_setter_$fitIntercept_$eq(BooleanParam arg0) { + + } + + public LogisticRegression setTol(double value) { + cmdLineParams.put(tol.name(), Double.toString(value)); + return (LogisticRegression) setDefault(tol, value); + } + + @Override + public double getTol() { + return Double.parseDouble(cmdLineParams.get(tol.name())); + } + + @Override + public void org$apache$spark$ml$param$shared$HasTol$_setter_$tol_$eq(DoubleParam arg0) { + + } + + @Override + public DoubleParam tol() { + return tol; + } + + @Override + public double getThreshold() { + return 0; + } + + @Override + public void org$apache$spark$ml$param$shared$HasThreshold$_setter_$threshold_$eq(DoubleParam arg0) { + + } + + @Override + public DoubleParam threshold() { + return null; + } + + public LogisticRegression setLabelIndex(int value) { + li = value; + return (LogisticRegression) setDefault(labelIndex, value); + } + + public int getLabelIndex() { + return Integer.parseInt(cmdLineParams.get(labelIndex.name())); + } + + public IntParam labelIdx() { + return labelIndex; + } + + public LogisticRegression setInputCol(String[] value) { + icname[0] = value[0]; + return (LogisticRegression) setDefault(inputCol, value); + } + + public String getInputCol() { + return icname[0]; + } + + public StringArrayParam inputCol() { + return inputCol; + } + + public LogisticRegression setOutputCol(String[] value) { + ocname[0] = value[0]; + return (LogisticRegression) setDefault(outputCol, value); + } + + public String getOutputCol() { + return ocname[0]; + } + + public StringArrayParam outputCol() { + return outputCol; + } + + @Override + public LogisticRegressionModel train(DataFrame df) { + MLContext ml = null; + MLOutput out = null; + + try { + ml = new MLContext(this.sc); + } catch (DMLRuntimeException e1) { + e1.printStackTrace(); + return null; + } + + // Convert input data to format that SystemML accepts + MatrixCharacteristics mcXin = new MatrixCharacteristics(); + JavaPairRDD<MatrixIndexes, MatrixBlock> Xin; + try { + Xin = RDDConverterUtilsExt.vectorDataFrameToBinaryBlock(new JavaSparkContext(this.sc), df, mcXin, false, "features"); + } catch (DMLRuntimeException e1) { + e1.printStackTrace(); + return null; + } + + JavaRDD<String> yin = df.select("label").rdd().toJavaRDD().map(new ConvertSingleColumnToString()); + + try { + // Register the input/output variables of script 'MultiLogReg.dml' + ml.registerInput("X", Xin, mcXin); + ml.registerInput("Y_vec", yin, "csv"); + ml.registerOutput("B_out"); + + // Or add ifdef in MultiLogReg.dml + cmdLineParams.put("X", " "); + cmdLineParams.put("Y", " "); + cmdLineParams.put("B", " "); + + + // ------------------------------------------------------------------------------------ + // Please note that this logic is subject to change and is put as a placeholder + String systemmlHome = System.getenv("SYSTEMML_HOME"); + if(systemmlHome == null) { + System.err.println("ERROR: The environment variable SYSTEMML_HOME is not set."); + return null; + } + + String dmlFilePath = systemmlHome + File.separator + "algorithms" + File.separator + "MultiLogReg.dml"; + // ------------------------------------------------------------------------------------ + + synchronized(MLContext.class) { + // static synchronization is necessary before execute call + out = ml.execute(dmlFilePath, cmdLineParams); + } + + JavaPairRDD<MatrixIndexes, MatrixBlock> b_out = out.getBinaryBlockedRDD("B_out"); + MatrixCharacteristics b_outMC = out.getMatrixCharacteristics("B_out"); + return new LogisticRegressionModel(b_out, b_outMC, sc).setParent(this); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8a6b3856/src/main/java/org/apache/sysml/api/javaml/LogisticRegressionModel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/javaml/LogisticRegressionModel.java b/src/main/java/org/apache/sysml/api/javaml/LogisticRegressionModel.java new file mode 100644 index 0000000..819380c --- /dev/null +++ b/src/main/java/org/apache/sysml/api/javaml/LogisticRegressionModel.java @@ -0,0 +1,179 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.api.javaml; + +import java.io.File; +import java.util.HashMap; + +import org.apache.spark.SparkContext; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.ml.classification.ProbabilisticClassificationModel; +import org.apache.spark.ml.param.ParamMap; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SQLContext; + +import org.apache.sysml.api.MLContext; +import org.apache.sysml.api.MLOutput; +import org.apache.sysml.runtime.DMLRuntimeException; +import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt; +import org.apache.sysml.runtime.matrix.MatrixCharacteristics; +import org.apache.sysml.runtime.matrix.data.MatrixBlock; +import org.apache.sysml.runtime.matrix.data.MatrixIndexes; + +public class LogisticRegressionModel extends ProbabilisticClassificationModel<Vector, LogisticRegressionModel> { + + private static final long serialVersionUID = -6464693773946415027L; + private JavaPairRDD<MatrixIndexes, MatrixBlock> b_out; + private SparkContext sc; + private MatrixCharacteristics b_outMC; + @Override + public LogisticRegressionModel copy(ParamMap paramMap) { + return this; + } + + public LogisticRegressionModel(JavaPairRDD<MatrixIndexes, MatrixBlock> b_out2, MatrixCharacteristics b_outMC, SparkContext sc) { + this.b_out = b_out2; + this.b_outMC = b_outMC; + this.sc = sc; + //this.cmdLineParams = cmdLineParams; + } + + public LogisticRegressionModel() { + } + + public LogisticRegressionModel(String uid) { + } + + @Override + public String uid() { + return Long.toString(LogisticRegressionModel.serialVersionUID); + } + + @Override + public Vector raw2probabilityInPlace(Vector arg0) { + return arg0; + } + + @Override + public int numClasses() { + return 2; + } + + @Override + public Vector predictRaw(Vector arg0) { + return arg0; + } + + + @Override + public double predict(Vector features) { + return super.predict(features); + } + + @Override + public double raw2prediction(Vector rawPrediction) { + return super.raw2prediction(rawPrediction); + } + + @Override + public double probability2prediction(Vector probability) { + return super.probability2prediction(probability); + } + + public static class ConvertIntToRow implements Function<Integer, Row> { + + private static final long serialVersionUID = -3480953015655773622L; + + @Override + public Row call(Integer arg0) throws Exception { + Object[] row_fields = new Object[1]; + row_fields[0] = new Double(arg0); + return RowFactory.create(row_fields); + } + + } + + @Override + public DataFrame transform(DataFrame dataset) { + try { + MatrixCharacteristics mcXin = new MatrixCharacteristics(); + JavaPairRDD<MatrixIndexes, MatrixBlock> Xin; + try { + Xin = RDDConverterUtilsExt.vectorDataFrameToBinaryBlock(new JavaSparkContext(this.sc), dataset, mcXin, false, "features"); + } catch (DMLRuntimeException e1) { + e1.printStackTrace(); + return null; + } + MLContext ml = new MLContext(sc); + ml.registerInput("X", Xin, mcXin); + ml.registerInput("B_full", b_out, b_outMC); // Changed MLContext for this method + ml.registerOutput("means"); + HashMap<String, String> param = new HashMap<String, String>(); + param.put("dfam", "3"); + + // ------------------------------------------------------------------------------------ + // Please note that this logic is subject to change and is put as a placeholder + String systemmlHome = System.getenv("SYSTEMML_HOME"); + if(systemmlHome == null) { + System.err.println("ERROR: The environment variable SYSTEMML_HOME is not set."); + return null; + } + // Or add ifdef in GLM-predict.dml + param.put("X", " "); + param.put("B", " "); + + String dmlFilePath = systemmlHome + File.separator + "algorithms" + File.separator + "GLM-predict.dml"; + // ------------------------------------------------------------------------------------ + MLOutput out = ml.execute(dmlFilePath, param); + + SQLContext sqlContext = new SQLContext(sc); + DataFrame prob = out.getDF(sqlContext, "means", true).withColumnRenamed("C1", "probability"); + + MLContext mlNew = new MLContext(sc); + mlNew.registerInput("X", Xin, mcXin); + mlNew.registerInput("B_full", b_out, b_outMC); + mlNew.registerInput("Prob", out.getBinaryBlockedRDD("means"), out.getMatrixCharacteristics("means")); + mlNew.registerOutput("Prediction"); + mlNew.registerOutput("rawPred"); + MLOutput outNew = mlNew.executeScript("Prob = read(\"temp1\"); " + + "Prediction = rowIndexMax(Prob); " + + "write(Prediction, \"tempOut\", \"csv\")" + + "X = read(\"temp2\");" + + "B_full = read(\"temp3\");" + + "rawPred = 1 / (1 + exp(- X * t(B_full)) );" // Raw prediction logic: + + "write(rawPred, \"tempOut1\", \"csv\")"); + + // TODO: Perform joins in the DML + DataFrame pred = outNew.getDF(sqlContext, "Prediction").withColumnRenamed("C1", "prediction").withColumnRenamed("ID", "ID1"); + DataFrame rawPred = outNew.getDF(sqlContext, "rawPred", true).withColumnRenamed("C1", "rawPrediction").withColumnRenamed("ID", "ID2"); + DataFrame predictionsNProb = prob.join(pred, prob.col("ID").equalTo(pred.col("ID1"))).select("ID", "probability", "prediction"); + predictionsNProb = predictionsNProb.join(rawPred, predictionsNProb.col("ID").equalTo(rawPred.col("ID2"))).select("ID", "probability", "prediction", "rawPrediction"); + DataFrame dataset1 = RDDConverterUtilsExt.addIDToDataFrame(dataset, sqlContext, "ID"); + return dataset1.join(predictionsNProb, dataset1.col("ID").equalTo(predictionsNProb.col("ID"))).orderBy("id"); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8a6b3856/src/main/java/org/apache/sysml/api/ml/LogisticRegression.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/ml/LogisticRegression.java b/src/main/java/org/apache/sysml/api/ml/LogisticRegression.java deleted file mode 100644 index 5c643ca..0000000 --- a/src/main/java/org/apache/sysml/api/ml/LogisticRegression.java +++ /dev/null @@ -1,474 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.api.ml; - -import java.io.File; -import java.util.HashMap; - -import org.apache.spark.SparkContext; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.ml.classification.LogisticRegressionParams; -import org.apache.spark.ml.classification.ProbabilisticClassifier; -import org.apache.spark.ml.param.BooleanParam; -import org.apache.spark.ml.param.DoubleParam; -import org.apache.spark.ml.param.IntParam; -import org.apache.spark.ml.param.StringArrayParam; -import org.apache.spark.mllib.linalg.Vector; -import org.apache.spark.sql.DataFrame; -import org.apache.spark.sql.SQLContext; - -import org.apache.sysml.api.MLContext; -import org.apache.sysml.api.MLOutput; -import org.apache.sysml.api.ml.LogisticRegressionModel; -import org.apache.sysml.api.ml.functions.ConvertSingleColumnToString; -import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt; -import org.apache.sysml.runtime.matrix.MatrixCharacteristics; -import org.apache.sysml.runtime.matrix.data.MatrixBlock; -import org.apache.sysml.runtime.matrix.data.MatrixIndexes; - -/** - * - * This class shows how SystemML can be integrated into MLPipeline. Note, it has not been optimized for performance and - * is implemented as a proof of concept. An optimized pipeline can be constructed by usage of DML's 'parfor' construct. - * - * TODO: - * - Please note that this class expects 1-based labels. To run below example, - * please set environment variable 'SYSTEMML_HOME' and create folder 'algorithms' - * and place atleast two scripts in that folder 'MultiLogReg.dml' and 'GLM-predict.dml' - * - It is not yet optimized for performance. - * - Also, it needs to be extended to surface all the parameters of MultiLogReg.dml - * - * Example usage: - * <pre><code> - * // Code to demonstrate usage of pipeline - * import org.apache.spark.ml.Pipeline - * import org.apache.sysml.api.ml.LogisticRegression - * import org.apache.spark.ml.feature.{HashingTF, Tokenizer} - * import org.apache.spark.mllib.linalg.Vector - * case class LabeledDocument(id: Long, text: String, label: Double) - * case class Document(id: Long, text: String) - * val training = sc.parallelize(Seq( - * LabeledDocument(0L, "a b c d e spark", 1.0), - * LabeledDocument(1L, "b d", 2.0), - * LabeledDocument(2L, "spark f g h", 1.0), - * LabeledDocument(3L, "hadoop mapreduce", 2.0), - * LabeledDocument(4L, "b spark who", 1.0), - * LabeledDocument(5L, "g d a y", 2.0), - * LabeledDocument(6L, "spark fly", 1.0), - * LabeledDocument(7L, "was mapreduce", 2.0), - * LabeledDocument(8L, "e spark program", 1.0), - * LabeledDocument(9L, "a e c l", 2.0), - * LabeledDocument(10L, "spark compile", 1.0), - * LabeledDocument(11L, "hadoop software", 2.0))) - * val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words") - * val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol("features") - * val lr = new LogisticRegression(sc, sqlContext) - * val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr)) - * val model = pipeline.fit(training.toDF) - * val test = sc.parallelize(Seq( - * Document(12L, "spark i j k"), - * Document(13L, "l m n"), - * Document(14L, "mapreduce spark"), - * Document(15L, "apache hadoop"))) - * model.transform(test.toDF).show - * - * // Code to demonstrate usage of cross-validation - * import org.apache.spark.ml.Pipeline - * import org.apache.sysml.api.ml.LogisticRegression - * import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator - * import org.apache.spark.ml.feature.{HashingTF, Tokenizer} - * import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator} - * import org.apache.spark.mllib.linalg.Vector - * case class LabeledDocument(id: Long, text: String, label: Double) - * case class Document(id: Long, text: String) - * val training = sc.parallelize(Seq( - * LabeledDocument(0L, "a b c d e spark", 1.0), - * LabeledDocument(1L, "b d", 2.0), - * LabeledDocument(2L, "spark f g h", 1.0), - * LabeledDocument(3L, "hadoop mapreduce", 2.0), - * LabeledDocument(4L, "b spark who", 1.0), - * LabeledDocument(5L, "g d a y", 2.0), - * LabeledDocument(6L, "spark fly", 1.0), - * LabeledDocument(7L, "was mapreduce", 2.0), - * LabeledDocument(8L, "e spark program", 1.0), - * LabeledDocument(9L, "a e c l", 2.0), - * LabeledDocument(10L, "spark compile", 1.0), - * LabeledDocument(11L, "hadoop software", 2.0))) - * val tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words") - * val hashingTF = new HashingTF().setNumFeatures(1000).setInputCol(tokenizer.getOutputCol).setOutputCol("features") - * val lr = new LogisticRegression(sc, sqlContext) - * val pipeline = new Pipeline().setStages(Array(tokenizer, hashingTF, lr)) - * val crossval = new CrossValidator().setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator) - * val paramGrid = new ParamGridBuilder().addGrid(hashingTF.numFeatures, Array(10, 100, 1000)).addGrid(lr.regParam, Array(0.1, 0.01)).build() - * crossval.setEstimatorParamMaps(paramGrid) - * crossval.setNumFolds(2) - * val cvModel = crossval.fit(training.toDF) - * val test = sc.parallelize(Seq( - * Document(12L, "spark i j k"), - * Document(13L, "l m n"), - * Document(14L, "mapreduce spark"), - * Document(15L, "apache hadoop"))) - * cvModel.transform(test.toDF).show - * </code></pre> - * - */ -public class LogisticRegression extends ProbabilisticClassifier<Vector, LogisticRegression, LogisticRegressionModel> - implements LogisticRegressionParams { - - private static final long serialVersionUID = 7763813395635870734L; - - private SparkContext sc = null; - private SQLContext sqlContext = null; - private HashMap<String, String> cmdLineParams = new HashMap<String, String>(); - - private IntParam icpt = new IntParam(this, "icpt", "Value of intercept"); - private DoubleParam reg = new DoubleParam(this, "reg", "Value of regularization parameter"); - private DoubleParam tol = new DoubleParam(this, "tol", "Value of tolerance"); - private IntParam moi = new IntParam(this, "moi", "Max outer iterations"); - private IntParam mii = new IntParam(this, "mii", "Max inner iterations"); - private IntParam labelIndex = new IntParam(this, "li", "Index of the label column"); - private StringArrayParam inputCol = new StringArrayParam(this, "icname", "Feature column name"); - private StringArrayParam outputCol = new StringArrayParam(this, "ocname", "Label column name"); - private int intMin = Integer.MIN_VALUE; - @SuppressWarnings("unused") - private int li = 0; - private String[] icname = new String[1]; - private String[] ocname = new String[1]; - - public LogisticRegression() { - } - - public LogisticRegression(String uid) { - } - - @Override - public LogisticRegression copy(org.apache.spark.ml.param.ParamMap paramMap) { - try { - // Copy deals with command-line parameter of script MultiLogReg.dml - LogisticRegression lr = new LogisticRegression(sc, sqlContext); - lr.cmdLineParams.put(icpt.name(), paramMap.getOrElse(icpt, 0).toString()); - lr.cmdLineParams.put(reg.name(), paramMap.getOrElse(reg, 0.0f).toString()); - lr.cmdLineParams.put(tol.name(), paramMap.getOrElse(tol, 0.000001f).toString()); - lr.cmdLineParams.put(moi.name(), paramMap.getOrElse(moi, 100).toString()); - lr.cmdLineParams.put(mii.name(), paramMap.getOrElse(mii, 0).toString()); - - return lr; - } catch (DMLRuntimeException e) { - e.printStackTrace(); - } - return null; - - } - - public LogisticRegression(SparkContext sc, SQLContext sqlContext) throws DMLRuntimeException { - this.sc = sc; - this.sqlContext = sqlContext; - - setDefault(intercept(), 0); - cmdLineParams.put(icpt.name(), "0"); - setDefault(regParam(), 0.0f); - cmdLineParams.put(reg.name(), "0.0f"); - setDefault(tol(), 0.000001f); - cmdLineParams.put(tol.name(), "0.000001f"); - setDefault(maxOuterIter(), 100); - cmdLineParams.put(moi.name(), "100"); - setDefault(maxInnerIter(), 0); - cmdLineParams.put(mii.name(), "0"); - setDefault(labelIdx(), intMin); - li = intMin; - setDefault(inputCol(), icname); - icname[0] = ""; - setDefault(outputCol(), ocname); - ocname[0] = ""; - } - - public LogisticRegression(SparkContext sc, SQLContext sqlContext, int icpt, double reg, double tol, int moi, int mii) throws DMLRuntimeException { - this.sc = sc; - this.sqlContext = sqlContext; - - setDefault(intercept(), icpt); - cmdLineParams.put(this.icpt.name(), Integer.toString(icpt)); - setDefault(regParam(), reg); - cmdLineParams.put(this.reg.name(), Double.toString(reg)); - setDefault(tol(), tol); - cmdLineParams.put(this.tol.name(), Double.toString(tol)); - setDefault(maxOuterIter(), moi); - cmdLineParams.put(this.moi.name(), Integer.toString(moi)); - setDefault(maxInnerIter(), mii); - cmdLineParams.put(this.mii.name(), Integer.toString(mii)); - setDefault(labelIdx(), intMin); - li = intMin; - setDefault(inputCol(), icname); - icname[0] = ""; - setDefault(outputCol(), ocname); - ocname[0] = ""; - } - - @Override - public String uid() { - return Long.toString(LogisticRegression.serialVersionUID); - } - - public LogisticRegression setRegParam(double value) { - cmdLineParams.put(reg.name(), Double.toString(value)); - return (LogisticRegression) setDefault(reg, value); - } - - @Override - public org.apache.spark.sql.types.StructType validateAndTransformSchema(org.apache.spark.sql.types.StructType arg0, boolean arg1, org.apache.spark.sql.types.DataType arg2) { - return null; - } - - @Override - public double getRegParam() { - return Double.parseDouble(cmdLineParams.get(reg.name())); - } - - @Override - public void org$apache$spark$ml$param$shared$HasRegParam$_setter_$regParam_$eq(DoubleParam arg0) { - - } - - @Override - public DoubleParam regParam() { - return reg; - } - - @Override - public DoubleParam elasticNetParam() { - return null; - } - - @Override - public double getElasticNetParam() { - return 0.0f; - } - - @Override - public void org$apache$spark$ml$param$shared$HasElasticNetParam$_setter_$elasticNetParam_$eq(DoubleParam arg0) { - - } - - @Override - public int getMaxIter() { - return 0; - } - - @Override - public IntParam maxIter() { - return null; - } - - public LogisticRegression setMaxOuterIter(int value) { - cmdLineParams.put(moi.name(), Integer.toString(value)); - return (LogisticRegression) setDefault(moi, value); - } - - public int getMaxOuterIter() { - return Integer.parseInt(cmdLineParams.get(moi.name())); - } - - public IntParam maxOuterIter() { - return this.moi; - } - - public LogisticRegression setMaxInnerIter(int value) { - cmdLineParams.put(mii.name(), Integer.toString(value)); - return (LogisticRegression) setDefault(mii, value); - } - - public int getMaxInnerIter() { - return Integer.parseInt(cmdLineParams.get(mii.name())); - } - - public IntParam maxInnerIter() { - return mii; - } - - @Override - public void org$apache$spark$ml$param$shared$HasMaxIter$_setter_$maxIter_$eq(IntParam arg0) { - - } - - public LogisticRegression setIntercept(int value) { - cmdLineParams.put(icpt.name(), Integer.toString(value)); - return (LogisticRegression) setDefault(icpt, value); - } - - public int getIntercept() { - return Integer.parseInt(cmdLineParams.get(icpt.name())); - } - - public IntParam intercept() { - return icpt; - } - - @Override - public BooleanParam fitIntercept() { - return null; - } - - @Override - public boolean getFitIntercept() { - return false; - } - - @Override - public void org$apache$spark$ml$param$shared$HasFitIntercept$_setter_$fitIntercept_$eq(BooleanParam arg0) { - - } - - public LogisticRegression setTol(double value) { - cmdLineParams.put(tol.name(), Double.toString(value)); - return (LogisticRegression) setDefault(tol, value); - } - - @Override - public double getTol() { - return Double.parseDouble(cmdLineParams.get(tol.name())); - } - - @Override - public void org$apache$spark$ml$param$shared$HasTol$_setter_$tol_$eq(DoubleParam arg0) { - - } - - @Override - public DoubleParam tol() { - return tol; - } - - @Override - public double getThreshold() { - return 0; - } - - @Override - public void org$apache$spark$ml$param$shared$HasThreshold$_setter_$threshold_$eq(DoubleParam arg0) { - - } - - @Override - public DoubleParam threshold() { - return null; - } - - public LogisticRegression setLabelIndex(int value) { - li = value; - return (LogisticRegression) setDefault(labelIndex, value); - } - - public int getLabelIndex() { - return Integer.parseInt(cmdLineParams.get(labelIndex.name())); - } - - public IntParam labelIdx() { - return labelIndex; - } - - public LogisticRegression setInputCol(String[] value) { - icname[0] = value[0]; - return (LogisticRegression) setDefault(inputCol, value); - } - - public String getInputCol() { - return icname[0]; - } - - public StringArrayParam inputCol() { - return inputCol; - } - - public LogisticRegression setOutputCol(String[] value) { - ocname[0] = value[0]; - return (LogisticRegression) setDefault(outputCol, value); - } - - public String getOutputCol() { - return ocname[0]; - } - - public StringArrayParam outputCol() { - return outputCol; - } - - @Override - public LogisticRegressionModel train(DataFrame df) { - MLContext ml = null; - MLOutput out = null; - - try { - ml = new MLContext(this.sc); - } catch (DMLRuntimeException e1) { - e1.printStackTrace(); - return null; - } - - // Convert input data to format that SystemML accepts - MatrixCharacteristics mcXin = new MatrixCharacteristics(); - JavaPairRDD<MatrixIndexes, MatrixBlock> Xin; - try { - Xin = RDDConverterUtilsExt.vectorDataFrameToBinaryBlock(new JavaSparkContext(this.sc), df, mcXin, false, "features"); - } catch (DMLRuntimeException e1) { - e1.printStackTrace(); - return null; - } - - JavaRDD<String> yin = df.select("label").rdd().toJavaRDD().map(new ConvertSingleColumnToString()); - - try { - // Register the input/output variables of script 'MultiLogReg.dml' - ml.registerInput("X", Xin, mcXin); - ml.registerInput("Y_vec", yin, "csv"); - ml.registerOutput("B_out"); - - // Or add ifdef in MultiLogReg.dml - cmdLineParams.put("X", " "); - cmdLineParams.put("Y", " "); - cmdLineParams.put("B", " "); - - - // ------------------------------------------------------------------------------------ - // Please note that this logic is subject to change and is put as a placeholder - String systemmlHome = System.getenv("SYSTEMML_HOME"); - if(systemmlHome == null) { - System.err.println("ERROR: The environment variable SYSTEMML_HOME is not set."); - return null; - } - - String dmlFilePath = systemmlHome + File.separator + "algorithms" + File.separator + "MultiLogReg.dml"; - // ------------------------------------------------------------------------------------ - - synchronized(MLContext.class) { - // static synchronization is necessary before execute call - out = ml.execute(dmlFilePath, cmdLineParams); - } - - JavaPairRDD<MatrixIndexes, MatrixBlock> b_out = out.getBinaryBlockedRDD("B_out"); - MatrixCharacteristics b_outMC = out.getMatrixCharacteristics("B_out"); - return new LogisticRegressionModel(b_out, b_outMC, sc).setParent(this); - } catch (Exception e) { - throw new RuntimeException(e); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8a6b3856/src/main/java/org/apache/sysml/api/ml/LogisticRegressionModel.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/sysml/api/ml/LogisticRegressionModel.java b/src/main/java/org/apache/sysml/api/ml/LogisticRegressionModel.java deleted file mode 100644 index a16e250..0000000 --- a/src/main/java/org/apache/sysml/api/ml/LogisticRegressionModel.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.api.ml; - -import java.io.File; -import java.util.HashMap; - -import org.apache.spark.SparkContext; -import org.apache.spark.api.java.JavaPairRDD; -import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.ml.classification.ProbabilisticClassificationModel; -import org.apache.spark.ml.param.ParamMap; -import org.apache.spark.mllib.linalg.Vector; -import org.apache.spark.sql.DataFrame; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.RowFactory; -import org.apache.spark.sql.SQLContext; - -import org.apache.sysml.api.MLContext; -import org.apache.sysml.api.MLOutput; -import org.apache.sysml.runtime.DMLRuntimeException; -import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt; -import org.apache.sysml.runtime.matrix.MatrixCharacteristics; -import org.apache.sysml.runtime.matrix.data.MatrixBlock; -import org.apache.sysml.runtime.matrix.data.MatrixIndexes; - -public class LogisticRegressionModel extends ProbabilisticClassificationModel<Vector, LogisticRegressionModel> { - - private static final long serialVersionUID = -6464693773946415027L; - private JavaPairRDD<MatrixIndexes, MatrixBlock> b_out; - private SparkContext sc; - private MatrixCharacteristics b_outMC; - @Override - public LogisticRegressionModel copy(ParamMap paramMap) { - return this; - } - - public LogisticRegressionModel(JavaPairRDD<MatrixIndexes, MatrixBlock> b_out2, MatrixCharacteristics b_outMC, SparkContext sc) { - this.b_out = b_out2; - this.b_outMC = b_outMC; - this.sc = sc; - //this.cmdLineParams = cmdLineParams; - } - - public LogisticRegressionModel() { - } - - public LogisticRegressionModel(String uid) { - } - - @Override - public String uid() { - return Long.toString(LogisticRegressionModel.serialVersionUID); - } - - @Override - public Vector raw2probabilityInPlace(Vector arg0) { - return arg0; - } - - @Override - public int numClasses() { - return 2; - } - - @Override - public Vector predictRaw(Vector arg0) { - return arg0; - } - - - @Override - public double predict(Vector features) { - return super.predict(features); - } - - @Override - public double raw2prediction(Vector rawPrediction) { - return super.raw2prediction(rawPrediction); - } - - @Override - public double probability2prediction(Vector probability) { - return super.probability2prediction(probability); - } - - public static class ConvertIntToRow implements Function<Integer, Row> { - - private static final long serialVersionUID = -3480953015655773622L; - - @Override - public Row call(Integer arg0) throws Exception { - Object[] row_fields = new Object[1]; - row_fields[0] = new Double(arg0); - return RowFactory.create(row_fields); - } - - } - - @Override - public DataFrame transform(DataFrame dataset) { - try { - MatrixCharacteristics mcXin = new MatrixCharacteristics(); - JavaPairRDD<MatrixIndexes, MatrixBlock> Xin; - try { - Xin = RDDConverterUtilsExt.vectorDataFrameToBinaryBlock(new JavaSparkContext(this.sc), dataset, mcXin, false, "features"); - } catch (DMLRuntimeException e1) { - e1.printStackTrace(); - return null; - } - MLContext ml = new MLContext(sc); - ml.registerInput("X", Xin, mcXin); - ml.registerInput("B_full", b_out, b_outMC); // Changed MLContext for this method - ml.registerOutput("means"); - HashMap<String, String> param = new HashMap<String, String>(); - param.put("dfam", "3"); - - // ------------------------------------------------------------------------------------ - // Please note that this logic is subject to change and is put as a placeholder - String systemmlHome = System.getenv("SYSTEMML_HOME"); - if(systemmlHome == null) { - System.err.println("ERROR: The environment variable SYSTEMML_HOME is not set."); - return null; - } - // Or add ifdef in GLM-predict.dml - param.put("X", " "); - param.put("B", " "); - - String dmlFilePath = systemmlHome + File.separator + "algorithms" + File.separator + "GLM-predict.dml"; - // ------------------------------------------------------------------------------------ - MLOutput out = ml.execute(dmlFilePath, param); - - SQLContext sqlContext = new SQLContext(sc); - DataFrame prob = out.getDF(sqlContext, "means", true).withColumnRenamed("C1", "probability"); - - MLContext mlNew = new MLContext(sc); - mlNew.registerInput("X", Xin, mcXin); - mlNew.registerInput("B_full", b_out, b_outMC); - mlNew.registerInput("Prob", out.getBinaryBlockedRDD("means"), out.getMatrixCharacteristics("means")); - mlNew.registerOutput("Prediction"); - mlNew.registerOutput("rawPred"); - MLOutput outNew = mlNew.executeScript("Prob = read(\"temp1\"); " - + "Prediction = rowIndexMax(Prob); " - + "write(Prediction, \"tempOut\", \"csv\")" - + "X = read(\"temp2\");" - + "B_full = read(\"temp3\");" - + "rawPred = 1 / (1 + exp(- X * t(B_full)) );" // Raw prediction logic: - + "write(rawPred, \"tempOut1\", \"csv\")"); - - // TODO: Perform joins in the DML - DataFrame pred = outNew.getDF(sqlContext, "Prediction").withColumnRenamed("C1", "prediction").withColumnRenamed("ID", "ID1"); - DataFrame rawPred = outNew.getDF(sqlContext, "rawPred", true).withColumnRenamed("C1", "rawPrediction").withColumnRenamed("ID", "ID2"); - DataFrame predictionsNProb = prob.join(pred, prob.col("ID").equalTo(pred.col("ID1"))).select("ID", "probability", "prediction"); - predictionsNProb = predictionsNProb.join(rawPred, predictionsNProb.col("ID").equalTo(rawPred.col("ID2"))).select("ID", "probability", "prediction", "rawPrediction"); - DataFrame dataset1 = RDDConverterUtilsExt.addIDToDataFrame(dataset, sqlContext, "ID"); - return dataset1.join(predictionsNProb, dataset1.col("ID").equalTo(predictionsNProb.col("ID"))).orderBy("id"); - } catch (Exception e) { - throw new RuntimeException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8a6b3856/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala b/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala new file mode 100644 index 0000000..2583088 --- /dev/null +++ b/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.api.ml + +import org.apache.sysml.api.{ MLContext, MLOutput } +import org.apache.sysml.runtime.matrix.MatrixCharacteristics +import org.apache.sysml.runtime.instructions.spark.utils.{ RDDConverterUtilsExt => RDDConverterUtils } +import org.apache.spark.{ SparkContext } +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.types.StructType +import org.apache.spark.ml.{ Model, Estimator } +import org.apache.spark.ml.classification._ +import org.apache.spark.ml.param.{ Params, Param, ParamMap, DoubleParam } +import org.apache.spark.ml.param.shared._ +import org.apache.spark.SparkConf +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.regression.LabeledPoint +import scala.reflect.ClassTag + +trait HasIcpt extends Params { + final val icpt: Param[Int] = new Param[Int](this, "icpt", "Intercept presence, shifting and rescaling X columns") + setDefault(icpt, 0) + final def getIcpt: Int = $(icpt) +} +trait HasMaxOuterIter extends Params { + final val maxOuterIter: Param[Int] = new Param[Int](this, "maxOuterIter", "max. number of outer (Newton) iterations") + setDefault(maxOuterIter, 100) + final def getMaxOuterIte: Int = $(maxOuterIter) +} +trait HasMaxInnerIter extends Params { + final val maxInnerIter: Param[Int] = new Param[Int](this, "maxInnerIter", "max. number of inner (conjugate gradient) iterations, 0 = no max") + setDefault(maxInnerIter, 0) + final def getMaxInnerIter: Int = $(maxInnerIter) +} +trait HasTol extends Params { + final val tol: DoubleParam = new DoubleParam(this, "tol", "the convergence tolerance for iterative algorithms") + setDefault(tol, 0.000001) + final def getTol: Double = $(tol) +} +trait HasRegParam extends Params { + final val regParam: DoubleParam = new DoubleParam(this, "tol", "the convergence tolerance for iterative algorithms") + setDefault(regParam, 0.000001) + final def getRegParam: Double = $(regParam) +} +object LogisticRegression { + final val scriptPath = "MultiLogReg.dml" +} + +/** + * Logistic Regression Scala API + */ +class LogisticRegression(override val uid: String, val sc: SparkContext) extends Estimator[LogisticRegressionModel] with HasIcpt + with HasRegParam with HasTol with HasMaxOuterIter with HasMaxInnerIter { + + def setIcpt(value: Int) = set(icpt, value) + def setMaxOuterIter(value: Int) = set(maxOuterIter, value) + def setMaxInnerIter(value: Int) = set(maxInnerIter, value) + def setRegParam(value: Double) = set(regParam, value) + def setTol(value: Double) = set(tol, value) + + override def copy(extra: ParamMap): LogisticRegression = { + val that = new LogisticRegression(uid, sc) + copyValues(that, extra) + } + override def transformSchema(schema: StructType): StructType = schema + override def fit(df: DataFrame): LogisticRegressionModel = { + val ml = new MLContext(df.rdd.sparkContext) + val mcXin = new MatrixCharacteristics() + val Xin = RDDConverterUtils.vectorDataFrameToBinaryBlock(sc, df, mcXin, false, "features") + val yin = df.select("label").rdd.map { _.apply(0).toString() } + + val mloutput = { + val paramsMap: Map[String, String] = Map( + "icpt" -> this.getIcpt.toString(), + "reg" -> this.getRegParam.toString(), + "tol" -> this.getTol.toString, + "moi" -> this.getMaxOuterIte.toString, + "mii" -> this.getMaxInnerIter.toString, + + "X" -> " ", + "Y" -> " ", + "B" -> " ") + ml.registerInput("X", Xin, mcXin); + ml.registerInput("Y_vec", yin, "csv"); + ml.registerOutput("B_out"); + ml.executeScript(ScriptsUtils.getDMLScript(LogisticRegression.scriptPath), paramsMap) + //ml.execute(ScriptsUtils.resolvePath(LogisticRegression.scriptPath), paramsMap) + } + new LogisticRegressionModel("logisticRegression")(mloutput) + } +} +object LogisticRegressionModel { + final val scriptPath = "GLM-predict.dml" +} + +/** + * Logistic Regression Scala API + */ + +class LogisticRegressionModel( + override val uid: String)( + val mloutput: MLOutput) extends Model[LogisticRegressionModel] with HasIcpt + with HasRegParam with HasTol with HasMaxOuterIter with HasMaxInnerIter { + override def copy(extra: ParamMap): LogisticRegressionModel = { + val that = new LogisticRegressionModel(uid)(mloutput) + copyValues(that, extra) + } + override def transformSchema(schema: StructType): StructType = schema + override def transform(df: DataFrame): DataFrame = { + val ml = new MLContext(df.rdd.sparkContext) + + val mcXin = new MatrixCharacteristics() + val Xin = RDDConverterUtils.vectorDataFrameToBinaryBlock(df.rdd.sparkContext, df, mcXin, false, "features") + + val mlscoreoutput = { + val paramsMap: Map[String, String] = Map( + "X" -> " ", + "B" -> " ") + ml.registerInput("X", Xin, mcXin); + ml.registerInput("B_full", mloutput.getBinaryBlockedRDD("B_out"), mloutput.getMatrixCharacteristics("B_out")); + ml.registerOutput("means"); + ml.executeScript(ScriptsUtils.getDMLScript(LogisticRegressionModel.scriptPath), paramsMap) + } + + val prob = mlscoreoutput.getDF(df.sqlContext, "means", true).withColumnRenamed("C1", "probability") + + val mlNew = new MLContext(df.rdd.sparkContext) + mlNew.registerInput("X", Xin, mcXin); + mlNew.registerInput("B_full", mloutput.getBinaryBlockedRDD("B_out"), mloutput.getMatrixCharacteristics("B_out")); + mlNew.registerInput("Prob", mlscoreoutput.getBinaryBlockedRDD("means"), mlscoreoutput.getMatrixCharacteristics("means")); + mlNew.registerOutput("Prediction"); + mlNew.registerOutput("rawPred"); + + val outNew = mlNew.executeScript("Prob = read(\"temp1\"); " + + "Prediction = rowIndexMax(Prob); " + + "write(Prediction, \"tempOut\", \"csv\")" + + "X = read(\"temp2\");" + + "B_full = read(\"temp3\");" + + "rawPred = 1 / (1 + exp(- X * t(B_full)) );" // Raw prediction logic: + + "write(rawPred, \"tempOut1\", \"csv\")"); + + val pred = outNew.getDF(df.sqlContext, "Prediction").withColumnRenamed("C1", "prediction").withColumnRenamed("ID", "ID1") + val rawPred = outNew.getDF(df.sqlContext, "rawPred", true).withColumnRenamed("C1", "rawPrediction").withColumnRenamed("ID", "ID2") + var predictionsNProb = prob.join(pred, prob.col("ID").equalTo(pred.col("ID1"))).select("ID", "probability", "prediction") + predictionsNProb = predictionsNProb.join(rawPred, predictionsNProb.col("ID").equalTo(rawPred.col("ID2"))).select("ID", "probability", "prediction", "rawPrediction") + val dataset1 = RDDConverterUtils.addIDToDataFrame(df, df.sqlContext, "ID") + dataset1.join(predictionsNProb, dataset1.col("ID").equalTo(predictionsNProb.col("ID"))) + } +} + +/** + * Example code for Logistic Regression + */ +object LogisticRegressionExample { + import org.apache.spark.{ SparkConf, SparkContext } + import org.apache.spark.sql.types._ + import org.apache.spark.mllib.linalg.Vectors + import org.apache.spark.mllib.regression.LabeledPoint + + def main(args: Array[String]) = { + val sparkConf: SparkConf = new SparkConf(); + val sc: SparkContext = new SparkContext("local", "TestLocal", sparkConf); + val sqlContext = new org.apache.spark.sql.SQLContext(sc); + + import sqlContext.implicits._ + val training = sc.parallelize(Seq( + LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)), + LabeledPoint(1.0, Vectors.dense(1.0, 0.4, 2.1)), + LabeledPoint(2.0, Vectors.dense(1.2, 0.0, 3.5)), + LabeledPoint(1.0, Vectors.dense(1.0, 0.5, 2.2)), + LabeledPoint(2.0, Vectors.dense(1.6, 0.8, 3.6)), + LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 2.3)))) + val lr = new LogisticRegression("log", sc) + val lrmodel = lr.fit(training.toDF) + lrmodel.mloutput.getDF(sqlContext, "B_out").show() + + val testing = sc.parallelize(Seq( + LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)), + LabeledPoint(1.0, Vectors.dense(1.0, 0.4, 2.1)), + LabeledPoint(2.0, Vectors.dense(1.2, 0.0, 3.5)), + LabeledPoint(1.0, Vectors.dense(1.0, 0.5, 2.2)), + LabeledPoint(2.0, Vectors.dense(1.6, 0.8, 3.6)), + LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 2.3)))) + + lrmodel.transform(testing.toDF).show + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8a6b3856/src/main/scala/org/apache/sysml/api/ml/ScriptsUtils.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/ScriptsUtils.scala b/src/main/scala/org/apache/sysml/api/ml/ScriptsUtils.scala new file mode 100644 index 0000000..59e2ee3 --- /dev/null +++ b/src/main/scala/org/apache/sysml/api/ml/ScriptsUtils.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.api.ml + +import java.io.File +import java.io.BufferedReader +import java.io.InputStreamReader +import org.apache.sysml.runtime.DMLRuntimeException + +object ScriptsUtils { + var systemmlHome = System.getenv("SYSTEMML_HOME") + + /** + * set SystemML home + */ + def setSystemmlHome(path: String) { + systemmlHome = path + } + + /* + * Internal function to get dml path + */ + private[sysml] def resolvePath(filename: String): String = { + import java.io.File + ScriptsUtils.systemmlHome + File.separator + "algorithms" + File.separator + filename + } + + /* + * Internal function to get dml string from jar + */ + private[sysml] def getDMLScript(algorithmFileName: String): String = { + var reader: BufferedReader = null + val out = new StringBuilder() + try { + val in = { + if (systemmlHome == "") { + classOf[LogisticRegression].getClassLoader().getResourceAsStream(algorithmFileName) + } else { + new java.io.FileInputStream(resolvePath(algorithmFileName)) + } + } + var reader = new BufferedReader(new InputStreamReader(in)) + var line = reader.readLine() + while (line != null) { + out.append(line); + out.append(System.getProperty("line.separator")); + line = reader.readLine() + } + } catch { + case ex: Exception => + throw new DMLRuntimeException("Cannot read the algorithm file " + algorithmFileName, ex) + } finally { + if (reader != null) + reader.close(); + } + out.toString() + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8a6b3856/src/main/scala/org/apache/sysml/api/ml/scala/LogisticRegression.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/scala/LogisticRegression.scala b/src/main/scala/org/apache/sysml/api/ml/scala/LogisticRegression.scala deleted file mode 100644 index af48242..0000000 --- a/src/main/scala/org/apache/sysml/api/ml/scala/LogisticRegression.scala +++ /dev/null @@ -1,169 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.api.ml.scala - -import org.apache.sysml.api.{MLContext, MLOutput} -import org.apache.sysml.runtime.matrix.MatrixCharacteristics -import org.apache.sysml.runtime.instructions.spark.utils.{ RDDConverterUtilsExt => RDDConverterUtils } -import org.apache.sysml.runtime.instructions.spark.utils.{RDDConverterUtilsExt => RDDConverterUtils} - -import org.apache.spark.{ SparkContext } -import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.types.StructType -import org.apache.spark.ml.{ Model, Estimator } -import org.apache.spark.ml.classification._ -import org.apache.spark.ml.param.{ Params, Param, ParamMap,DoubleParam } -import org.apache.spark.ml.param.shared._ -import org.apache.spark.SparkConf -import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.regression.LabeledPoint - -trait HasIcpt extends Params { - final val icpt: Param[Int] = new Param[Int](this, "icpt", "Intercept presence, shifting and rescaling X columns") - setDefault(icpt, 0) - final def getIcpt: Int = $(icpt) -} -trait HasMaxOuterIter extends Params { - final val maxOuterIter: Param[Int] = new Param[Int](this, "maxOuterIter", "max. number of outer (Newton) iterations") - setDefault(maxOuterIter, 100) - final def getMaxOuterIte: Int = $(maxOuterIter) -} -trait HasMaxInnerIter extends Params { - final val maxInnerIter: Param[Int] = new Param[Int](this, "maxInnerIter", "max. number of inner (conjugate gradient) iterations, 0 = no max") - setDefault(maxInnerIter, 0) - final def getMaxInnerIter: Int = $(maxInnerIter) -} -trait HasTol extends Params { - final val tol: DoubleParam = new DoubleParam(this, "tol", "the convergence tolerance for iterative algorithms") - setDefault(tol,0.000001) - final def getTol: Double = $(tol) -} -trait HasRegParam extends Params { - final val regParam: DoubleParam = new DoubleParam(this, "tol", "the convergence tolerance for iterative algorithms") - setDefault(regParam,0.000001) - final def getRegParam: Double = $(regParam) -} -object LogisticRegression{ - final val scriptPath = "MultiLogReg.dml" -} -class LogisticRegression(override val uid: String,val sc:SparkContext) extends Estimator[LogisticRegressionModel] with HasIcpt - with HasRegParam with HasTol with HasMaxOuterIter with HasMaxInnerIter { - - def setIcpt(value: Int) = set(icpt, value) - def setMaxOuterIter(value: Int) = set(maxOuterIter, value) - def setMaxInnerIter(value: Int) = set(maxInnerIter, value) - def setRegParam(value: Double) = set(regParam, value) - def setTol(value: Double) = set(tol, value) - - override def copy(extra: ParamMap): LogisticRegression = defaultCopy(extra) - override def transformSchema(schema: StructType): StructType = schema - override def fit(df: DataFrame): LogisticRegressionModel = { - val ml = new MLContext(df.rdd.sparkContext) - val mcXin = new MatrixCharacteristics() - val Xin = RDDConverterUtils.vectorDataFrameToBinaryBlock(sc, df, mcXin, false, "features") - val yin = df.select("label").rdd.map { _.apply(0).toString() } - - val mloutput = { - val paramsMap:Map[String,String] = Map( - "icpt"->this.getIcpt.toString(), - "reg" ->this.getRegParam.toString(), - "tol" ->this.getTol.toString, - "moi" ->this.getMaxOuterIte.toString, - "mii" ->this.getMaxInnerIter.toString, - - "X" -> " ", - "Y" -> " ", - "B" -> " " - ) - ml.registerInput("X", Xin, mcXin); - ml.registerInput("Y_vec", yin, "csv"); - ml.registerOutput("B_out"); - ml.execute(ScriptsUtils.resolvePath(LogisticRegression.scriptPath),paramsMap) - } - new LogisticRegressionModel("logisticRegression")(mloutput) - } -} -object LogisticRegressionModel{ - final val scriptPath = "GLM-predict.dml" -} -class LogisticRegressionModel( - override val uid: String)( - val mloutput: MLOutput) extends Model[LogisticRegressionModel] with HasIcpt - with HasRegParam with HasTol with HasMaxOuterIter with HasMaxInnerIter { - override def copy(extra: ParamMap): LogisticRegressionModel = defaultCopy(extra) - override def transformSchema(schema: StructType): StructType = schema - override def transform(df: DataFrame): DataFrame = { - val ml = new MLContext(df.rdd.sparkContext) - - val mcXin = new MatrixCharacteristics() - val Xin = RDDConverterUtils.vectorDataFrameToBinaryBlock(df.rdd.sparkContext, df, mcXin, false, "features") - val yin = df.select("label").rdd.map { _.apply(0).toString() } - - val mlscoreoutput = { - val paramsMap:Map[String,String] = Map( - "dfam" -> "3", - "X" -> " ", - "B" -> " " - ) - ml.registerInput("X", Xin, mcXin); - ml.registerInput("B_full", mloutput.getBinaryBlockedRDD("B_out"),mloutput.getMatrixCharacteristics("B_out")); - ml.registerInput("Y", yin,"csv") - ml.registerOutput("means"); - ml.execute(ScriptsUtils.resolvePath(LogisticRegressionModel.scriptPath),paramsMap) - } - - mlscoreoutput.getDF(df.sqlContext, "means", true).withColumnRenamed("C1", "probability") - } -} - -object LogisticRegressionExample { - import org.apache.spark.{ SparkConf, SparkContext } - import org.apache.spark.sql.types._ -import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.regression.LabeledPoint - - def main(args: Array[String]) = { - val sparkConf: SparkConf = new SparkConf(); - val sc: SparkContext = new SparkContext("local", "TestLocal", sparkConf); - val sqlContext = new org.apache.spark.sql.SQLContext(sc); - -import sqlContext.implicits._ - val training = sc.parallelize(Seq( - LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)), - LabeledPoint(1.0, Vectors.dense(1.0, 0.4, 2.1)), - LabeledPoint(2.0, Vectors.dense(1.2, 0.0, 3.5)), - LabeledPoint(1.0, Vectors.dense(1.0, 0.5, 2.2)), - LabeledPoint(2.0, Vectors.dense(1.6, 0.8, 3.6)), - LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 2.3)))) - val lr = new LogisticRegression("log", sc) - val lrmodel = lr.fit(training.toDF) - lrmodel.mloutput.getDF(sqlContext, "B_out").show() - - val testing = sc.parallelize(Seq( - LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)), - LabeledPoint(1.0, Vectors.dense(1.0, 0.4, 2.1)), - LabeledPoint(2.0, Vectors.dense(1.2, 0.0, 3.5)), - LabeledPoint(1.0, Vectors.dense(1.0, 0.5, 2.2)), - LabeledPoint(2.0, Vectors.dense(1.6, 0.8, 3.6)), - LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 2.3)))) - - lrmodel.transform(testing.toDF).show - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8a6b3856/src/main/scala/org/apache/sysml/api/ml/scala/ScriptsUtils.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/scala/ScriptsUtils.scala b/src/main/scala/org/apache/sysml/api/ml/scala/ScriptsUtils.scala deleted file mode 100644 index cd9d3dd..0000000 --- a/src/main/scala/org/apache/sysml/api/ml/scala/ScriptsUtils.scala +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.api.ml.scala - -import java.io.File -import java.io.BufferedReader -import java.io.InputStreamReader -import org.apache.sysml.runtime.DMLRuntimeException - -object ScriptsUtils { - var systemmlHome = System.getenv("SYSTEMML_HOME") - def resolvePath(filename:String):String = { - import java.io.File - ScriptsUtils.systemmlHome + File.separator + "algorithms" + File.separator + filename - } - def setSystemmlHome(path:String) { - systemmlHome = path - } - - // Example usage: val dmlScriptForGLM = getDMLScript("GLM") - def getDMLScript(algorithmName:String): String = { - var reader:BufferedReader = null - val out = new StringBuilder() - - try { - val in = classOf[LogisticRegression].getClassLoader().getResourceAsStream(algorithmName + ".dml") - reader = new BufferedReader(new InputStreamReader(in)) - var line = reader.readLine() - while (line != null) { - out.append(line); - out.append(System.getProperty("line.separator")); - line = reader.readLine() - } - } - catch { - case ex: Exception => - throw new DMLRuntimeException("Cannot read the algorithm " + algorithmName, ex) - } - finally { - if(reader != null) - reader.close(); - } - return out.toString() - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8a6b3856/src/test/scala/org/apache/sysml/api/ml/LogisticRegressionSuite.scala ---------------------------------------------------------------------- diff --git a/src/test/scala/org/apache/sysml/api/ml/LogisticRegressionSuite.scala b/src/test/scala/org/apache/sysml/api/ml/LogisticRegressionSuite.scala new file mode 100644 index 0000000..8d5d095 --- /dev/null +++ b/src/test/scala/org/apache/sysml/api/ml/LogisticRegressionSuite.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.api.ml + +import org.scalatest.FunSuite +import org.scalatest.Matchers +import org.apache.spark.Logging +import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.regression.LabeledPoint + +class LogisticRegressionSuite extends FunSuite with WrapperSparkContext with Matchers with Logging { + + test("run logistic regression with default") { + //Make sure system ml home set when run wrapper + val newsqlContext = new org.apache.spark.sql.SQLContext(sc); + + import newsqlContext.implicits._ + val training = sc.parallelize(Seq( + LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)), + LabeledPoint(1.0, Vectors.dense(1.0, 0.4, 2.1)), + LabeledPoint(2.0, Vectors.dense(1.2, 0.0, 3.5)))) + val testing = sc.parallelize(Seq( + LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)), + LabeledPoint(1.0, Vectors.dense(1.0, 0.4, 2.1)), + LabeledPoint(2.0, Vectors.dense(1.2, 0.0, 3.5)))) + val lr = new LogisticRegression("log", sc) + val lrmodel = lr.fit(training.toDF) + lrmodel.transform(testing.toDF).show + + lr.getIcpt shouldBe 0 + lrmodel.getIcpt shouldBe lr.getIcpt + lrmodel.getMaxInnerIter shouldBe lr.getMaxInnerIter + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8a6b3856/src/test/scala/org/apache/sysml/api/ml/WrapperSparkContext.scala ---------------------------------------------------------------------- diff --git a/src/test/scala/org/apache/sysml/api/ml/WrapperSparkContext.scala b/src/test/scala/org/apache/sysml/api/ml/WrapperSparkContext.scala new file mode 100644 index 0000000..205a1a9 --- /dev/null +++ b/src/test/scala/org/apache/sysml/api/ml/WrapperSparkContext.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.sysml.api.ml + +import org.scalatest.{ BeforeAndAfterAll, Suite } +import org.apache.spark.{ SparkConf, SparkContext } +import org.apache.spark.sql.SQLContext + +trait WrapperSparkContext extends BeforeAndAfterAll { self: Suite => + @transient var sc: SparkContext = _ + @transient var sqlContext: SQLContext = _ + + override def beforeAll() { + super.beforeAll() + val conf = new SparkConf() + .setMaster("local[2]") + .setAppName("MLlibUnitTest") + sc = new SparkContext(conf) + //SQLContext.clearActive() + sqlContext = new SQLContext(sc) + //SQLContext.setActive(sqlContext) + } + + override def afterAll() { + try { + sqlContext = null + //SQLContext.clearActive() + if (sc != null) { + sc.stop() + } + sc = null + } finally { + super.afterAll() + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8a6b3856/src/test/scala/org/apache/sysml/api/ml/scala/LogisticRegressionSuite.scala ---------------------------------------------------------------------- diff --git a/src/test/scala/org/apache/sysml/api/ml/scala/LogisticRegressionSuite.scala b/src/test/scala/org/apache/sysml/api/ml/scala/LogisticRegressionSuite.scala deleted file mode 100644 index 8f36ef9..0000000 --- a/src/test/scala/org/apache/sysml/api/ml/scala/LogisticRegressionSuite.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.api.ml.scala - -import org.scalatest.FunSuite -import org.scalatest.Matchers -import org.apache.spark.Logging -import org.apache.spark.mllib.linalg.Vectors -import org.apache.spark.mllib.regression.LabeledPoint - -class LogisticRegressionSuite extends FunSuite with WrapperSparkContext with Matchers with Logging{ - - test("run logistic regression with default"){ - //Make sure system ml home set when run wrapper - val newsqlContext = new org.apache.spark.sql.SQLContext(sc); - import newsqlContext.implicits._ - val training = sc.parallelize(Seq( - LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)), - LabeledPoint(1.0, Vectors.dense(1.0, 0.4, 2.1)), - LabeledPoint(2.0, Vectors.dense(1.2, 0.0, 3.5)))) - val testing = sc.parallelize(Seq( - LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)), - LabeledPoint(1.0, Vectors.dense(1.0, 0.4, 2.1)), - LabeledPoint(2.0, Vectors.dense(1.2, 0.0, 3.5)))) - val lr = new LogisticRegression("log",sc) - val lrmodel = lr.fit(training.toDF) - lrmodel.transform(testing.toDF).show - - lr.getIcpt shouldBe 0 - lrmodel.getIcpt shouldBe lr.getIcpt - lrmodel.getMaxInnerIter shouldBe lr.getMaxInnerIter - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/8a6b3856/src/test/scala/org/apache/sysml/api/ml/scala/WrapperSparkContext.scala ---------------------------------------------------------------------- diff --git a/src/test/scala/org/apache/sysml/api/ml/scala/WrapperSparkContext.scala b/src/test/scala/org/apache/sysml/api/ml/scala/WrapperSparkContext.scala deleted file mode 100644 index 58d629c..0000000 --- a/src/test/scala/org/apache/sysml/api/ml/scala/WrapperSparkContext.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.sysml.api.ml.scala - -import org.scalatest.{BeforeAndAfterAll, Suite} - -import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.sql.SQLContext - -trait WrapperSparkContext extends BeforeAndAfterAll { self: Suite => - @transient var sc: SparkContext = _ - @transient var sqlContext: SQLContext = _ - - override def beforeAll() { - super.beforeAll() - val conf = new SparkConf() - .setMaster("local[2]") - .setAppName("MLlibUnitTest") - sc = new SparkContext(conf) - //SQLContext.clearActive() - sqlContext = new SQLContext(sc) - //SQLContext.setActive(sqlContext) - } - - override def afterAll() { - try { - sqlContext = null - //SQLContext.clearActive() - if (sc != null) { - sc.stop() - } - sc = null - } finally { - super.afterAll() - } - } - -} \ No newline at end of file