Repository: incubator-systemml Updated Branches: refs/heads/master f2344ff6f -> 172a2c511
[SYSTEMML-1088] [SYSTEMML-1090] Remove the need for label mapping for Scala wrappers - Also, removed the need to set SparkContext if already created (for example in interactive pyspark shell). - Updated the documentation. Project: http://git-wip-us.apache.org/repos/asf/incubator-systemml/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-systemml/commit/172a2c51 Tree: http://git-wip-us.apache.org/repos/asf/incubator-systemml/tree/172a2c51 Diff: http://git-wip-us.apache.org/repos/asf/incubator-systemml/diff/172a2c51 Branch: refs/heads/master Commit: 172a2c511b703b5f7ffd38ac8a19fd1a1853890e Parents: f2344ff Author: Niketan Pansare <npan...@us.ibm.com> Authored: Wed Nov 16 15:26:42 2016 -0800 Committer: Niketan Pansare <npan...@us.ibm.com> Committed: Wed Nov 16 15:28:31 2016 -0800 ---------------------------------------------------------------------- docs/beginners-guide-python.md | 60 +++++------ src/main/python/systemml/defmatrix.py | 6 +- src/main/python/systemml/mllearn/estimators.py | 104 ++++++++++++++----- .../sysml/api/ml/BaseSystemMLClassifier.scala | 27 ++--- .../sysml/api/ml/BaseSystemMLRegressor.scala | 8 +- .../apache/sysml/api/ml/LinearRegression.scala | 8 +- .../sysml/api/ml/LogisticRegression.scala | 16 +-- .../org/apache/sysml/api/ml/NaiveBayes.scala | 16 +-- .../apache/sysml/api/ml/PredictionUtils.scala | 81 --------------- .../scala/org/apache/sysml/api/ml/SVM.scala | 18 ++-- 10 files changed, 156 insertions(+), 188 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172a2c51/docs/beginners-guide-python.md ---------------------------------------------------------------------- diff --git a/docs/beginners-guide-python.md b/docs/beginners-guide-python.md index 65eef50..8d597bf 100644 --- a/docs/beginners-guide-python.md +++ b/docs/beginners-guide-python.md @@ -72,54 +72,41 @@ brew install apache-spark16 #### Step 1: Install SystemML Python package +We are working towards uploading the python package on pypi. Until then, please use following commands: + ```bash -pip install systemml +git checkout https://github.com/apache/incubator-systemml.git +cd incubator-systemml +mvn post-integration-test -P distribution -DskipTests +pip install src/main/python/dist/systemml-incubating-0.11.0.dev1.tar.gz ``` -#### Step 2: Download SystemML Java binaries - -SystemML Python package downloads the corresponding Java binaries (along with algorithms) and places them -into the installed location. To find the location of the downloaded Java binaries, use the following command: - +The above commands will install Python package and place the corresponding Java binaries (along with algorithms) into the installed location. +To find the location of the downloaded Java binaries, use the following command: ```bash python -c 'import imp; import os; print os.path.join(imp.find_module("systemml")[1], "systemml-java")' ``` -#### Step 3: (Optional but recommended) Set SYSTEMML_HOME environment variable -<div class="codetabs"> -<div data-lang="OSX" markdown="1"> -```bash -SYSTEMML_HOME=`python -c 'import imp; import os; print os.path.join(imp.find_module("systemml")[1], "systemml-java")'` -# If you are using zsh or ksh or csh, append it to ~/.zshrc or ~/.profile or ~/.login respectively. -echo '' >> ~/.bashrc -echo 'export SYSTEMML_HOME='$SYSTEMML_HOME >> ~/.bashrc -``` -</div> -<div data-lang="Linux" markdown="1"> -```bash -SYSTEMML_HOME=`python -c 'import imp; import os; print os.path.join(imp.find_module("systemml")[1], "systemml-java")'` -# If you are using zsh or ksh or csh, append it to ~/.zshrc or ~/.profile or ~/.login respectively. -echo '' >> ~/.bashrc -echo 'export SYSTEMML_HOME='$SYSTEMML_HOME >> ~/.bashrc -``` -</div> -</div> - Note: the user is free to either use the prepackaged Java binaries or download them from [SystemML website](http://systemml.apache.org/download.html) or build them from the [source](https://github.com/apache/incubator-systemml). +To uninstall SystemML, please use following command: +```bash +pip uninstall systemml-incubating +``` + ### Start Pyspark shell <div class="codetabs"> <div data-lang="OSX" markdown="1"> ```bash -pyspark --master local[*] --driver-class-path $SYSTEMML_HOME"/SystemML.jar" +pyspark --master local[*] ``` </div> <div data-lang="Linux" markdown="1"> ```bash -pyspark --master local[*] --driver-class-path $SYSTEMML_HOME"/SystemML.jar" +pyspark --master local[*] ``` </div> </div> @@ -131,7 +118,6 @@ To get started with SystemML, let's try few elementary matrix multiplication ope ```python import systemml as sml import numpy as np -sml.setSparkContext(sc) m1 = sml.matrix(np.ones((3,3)) + 2) m2 = sml.matrix(np.ones((3,3)) + 3) m2 = m1 * (m2 + m1) @@ -166,7 +152,6 @@ X_test = diabetes_X[-20:] y_train = diabetes.target[:-20] y_test = diabetes.target[-20:] # Train Linear Regression model -sml.setSparkContext(sc) X = sml.matrix(X_train) y = sml.matrix(y_train) A = X.transpose().dot(X) @@ -236,7 +221,7 @@ from pyspark.sql import SQLContext sqlCtx = SQLContext(sc) digits = datasets.load_digits() X_digits = digits.data -y_digits = digits.target + 1 +y_digits = digits.target n_samples = len(X_digits) X_train = X_digits[:.9 * n_samples] y_train = y_digits[:.9 * n_samples] @@ -260,18 +245,23 @@ To train the above algorithm on larger dataset, we can load the dataset into Dat from sklearn import datasets, neighbors from systemml.mllearn import LogisticRegression from pyspark.sql import SQLContext +import pandas as pd +from sklearn.metrics import accuracy_score import systemml as sml sqlCtx = SQLContext(sc) digits = datasets.load_digits() X_digits = digits.data -y_digits = digits.target + 1 +y_digits = digits.target n_samples = len(X_digits) # Split the data into training/testing sets and convert to PySpark DataFrame df_train = sml.convertToLabeledDF(sqlContext, X_digits[:.9 * n_samples], y_digits[:.9 * n_samples]) -X_test = X_digits[.9 * n_samples:] -y_test = y_digits[.9 * n_samples:] +X_test = sqlCtx.createDataFrame(pd.DataFrame(X_digits[.9 * n_samples:])) logistic = LogisticRegression(sqlCtx) -print('LogisticRegression score: %f' % logistic.fit(df_train).score(X_test, y_test)) +logistic.fit(df_train) +y_predicted = logistic.predict(X_test) +y_predicted = y_predicted.select('prediction').toPandas().as_matrix().flatten() +y_test = y_digits[.9 * n_samples:] +print('LogisticRegression score: %f' % accuracy_score(y_test, y_predicted)) ``` Output: http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172a2c51/src/main/python/systemml/defmatrix.py ---------------------------------------------------------------------- diff --git a/src/main/python/systemml/defmatrix.py b/src/main/python/systemml/defmatrix.py index 08c802f..be9bc5f 100644 --- a/src/main/python/systemml/defmatrix.py +++ b/src/main/python/systemml/defmatrix.py @@ -42,9 +42,13 @@ def setSparkContext(sc): matrix.sqlContext = SQLContext(sc) matrix.ml = MLContext(matrix.sc) + def checkIfMLContextIsSet(): if matrix.ml is None: - raise Exception('Expected setSparkContext(sc) to be called.') + if SparkContext._active_spark_context is not None: + setSparkContext(SparkContext._active_spark_context) + else: + raise Exception('Expected setSparkContext(sc) to be called, where sc is active SparkContext.') ########################## AST related operations ################################## http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172a2c51/src/main/python/systemml/mllearn/estimators.py ---------------------------------------------------------------------- diff --git a/src/main/python/systemml/mllearn/estimators.py b/src/main/python/systemml/mllearn/estimators.py index 7bf843f..d4ece89 100644 --- a/src/main/python/systemml/mllearn/estimators.py +++ b/src/main/python/systemml/mllearn/estimators.py @@ -26,6 +26,13 @@ from pyspark.ml import Estimator from pyspark.ml.feature import VectorAssembler from pyspark.sql import DataFrame import sklearn as sk +from sklearn.metrics import accuracy_score, r2_score +from py4j.protocol import Py4JError +import traceback +from sklearn.preprocessing import LabelEncoder +import threading +import time +import math from ..converters import * from ..classloader import * @@ -36,10 +43,10 @@ def assemble(sqlCtx, pdf, inputCols, outputCol): return assembler.transform(tmpDF) class BaseSystemMLEstimator(Estimator): - featuresCol = 'features' - labelCol = 'label' - - def setFeaturesCol(self, colName): + features_col = 'features' + label_col = 'label' + + def set_features_col(self, colName): """ Sets the default column name for features of PySpark DataFrame. @@ -47,9 +54,9 @@ class BaseSystemMLEstimator(Estimator): ---------- colName: column name for features (default: 'features') """ - self.featuresCol = colName + self.features_col = colName - def setLabelCol(self, colName): + def set_label_col(self, colName): """ Sets the default column name for features of PySpark DataFrame. @@ -57,8 +64,35 @@ class BaseSystemMLEstimator(Estimator): ---------- colName: column name for features (default: 'label') """ - self.labelCol = colName + self.label_col = colName + + def _fit_df(self): + try: + self.model = self.estimator.fit(self.X._jdf) + except Py4JError: + traceback.print_exc() + + def fit_df(self, X): + self.X = X + self._fit_df() + self.X = None + return self + + def _fit_numpy(self): + try: + self.model = self.estimator.fit(convertToMatrixBlock(self.sc, self.X), convertToMatrixBlock(self.sc, self.y)) + except Py4JError: + traceback.print_exc() + + def fit_numpy(self, X, y): + self.X = X + self.y = y + self._fit_numpy() + self.X = None + self.y = None + return self + # Returns a model after calling fit(df) on Estimator object on JVM def _fit(self, X): """ @@ -66,11 +100,10 @@ class BaseSystemMLEstimator(Estimator): Parameters ---------- - X: PySpark DataFrame that contain the columns featuresCol (default: 'features') and labelCol (default: 'label') + X: PySpark DataFrame that contain the columns features_col (default: 'features') and label_col (default: 'label') """ - if hasattr(X, '_jdf') and self.featuresCol in X.columns and self.labelCol in X.columns: - self.model = self.estimator.fit(X._jdf) - return self + if hasattr(X, '_jdf') and self.features_col in X.columns and self.label_col in X.columns: + return self.fit_df(X) else: raise Exception('Incorrect usage: Expected dataframe as input with features/label as columns') @@ -86,6 +119,7 @@ class BaseSystemMLEstimator(Estimator): if y is None: return self._fit(X) elif y is not None and isinstance(X, SUPPORTED_TYPES) and isinstance(y, SUPPORTED_TYPES): + y = self.encode(y) if self.transferUsingDF: pdfX = convertToPandasDF(X) pdfY = convertToPandasDF(y) @@ -94,14 +128,14 @@ class BaseSystemMLEstimator(Estimator): if pdfX.shape[0] != pdfY.shape[0]: raise Exception('Number of rows of X and y should match') colNames = pdfX.columns - pdfX[self.labelCol] = pdfY[pdfY.columns[0]] - df = assemble(self.sqlCtx, pdfX, colNames, self.featuresCol).select(self.featuresCol, self.labelCol) - self.model = self.estimator.fit(df._jdf) + pdfX[self.label_col] = pdfY[pdfY.columns[0]] + df = assemble(self.sqlCtx, pdfX, colNames, self.features_col).select(self.features_col, self.label_col) + self.fit_df(df) else: numColsy = getNumCols(y) if numColsy != 1: raise Exception('Expected y to be a column vector') - self.model = self.estimator.fit(convertToMatrixBlock(self.sc, X), convertToMatrixBlock(self.sc, y)) + self.fit_numpy(X, y) if self.setOutputRawPredictionsToFalse: self.model.setOutputRawPredictions(False) return self @@ -110,7 +144,7 @@ class BaseSystemMLEstimator(Estimator): def transform(self, X): return self.predict(X) - + # Returns either a DataFrame or MatrixBlock after calling transform(X:MatrixBlock, y:MatrixBlock) on Model object on JVM def predict(self, X): """ @@ -123,26 +157,29 @@ class BaseSystemMLEstimator(Estimator): if isinstance(X, SUPPORTED_TYPES): if self.transferUsingDF: pdfX = convertToPandasDF(X) - df = assemble(self.sqlCtx, pdfX, pdfX.columns, self.featuresCol).select(self.featuresCol) + df = assemble(self.sqlCtx, pdfX, pdfX.columns, self.features_col).select(self.features_col) retjDF = self.model.transform(df._jdf) retDF = DataFrame(retjDF, self.sqlCtx) retPDF = retDF.sort('__INDEX').select('prediction').toPandas() if isinstance(X, np.ndarray): - return retPDF.as_matrix().flatten() + return self.decode(retPDF.as_matrix().flatten()) else: - return retPDF + return self.decode(retPDF) else: - retNumPy = convertToNumPyArr(self.sc, self.model.transform(convertToMatrixBlock(self.sc, X))) + try: + retNumPy = self.decode(convertToNumPyArr(self.sc, self.model.transform(convertToMatrixBlock(self.sc, X)))) + except Py4JError: + traceback.print_exc() if isinstance(X, np.ndarray): return retNumPy else: return retNumPy # TODO: Convert to Pandas elif hasattr(X, '_jdf'): - if self.featuresCol in X.columns: + if self.features_col in X.columns: # No need to assemble as input DF is likely coming via MLPipeline df = X else: - assembler = VectorAssembler(inputCols=X.columns, outputCol=self.featuresCol) + assembler = VectorAssembler(inputCols=X.columns, outputCol=self.features_col) df = assembler.transform(X) retjDF = self.model.transform(df._jdf) retDF = DataFrame(retjDF, self.sqlCtx) @@ -154,6 +191,17 @@ class BaseSystemMLEstimator(Estimator): class BaseSystemMLClassifier(BaseSystemMLEstimator): + def encode(self, y): + self.le = LabelEncoder() + self.le.fit(y) + return self.le.transform(y) + 1 + + def decode(self, y): + if self.le is not None: + return self.le.inverse_transform(np.asarray(y - 1, dtype=int)) + else: + return [ self.labelMap[int(i)] for i in y ] + def score(self, X, y): """ Scores the predicted value with ground truth 'y' @@ -163,11 +211,17 @@ class BaseSystemMLClassifier(BaseSystemMLEstimator): X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix y: NumPy ndarray, Pandas DataFrame, scipy sparse matrix """ - return sk.metrics.accuracy_score(y, self.predict(X)) + return accuracy_score(y, self.predict(X)) class BaseSystemMLRegressor(BaseSystemMLEstimator): + def encode(self, y): + return y + + def decode(self, y): + return y + def score(self, X, y): """ Scores the predicted value with ground truth 'y' @@ -177,7 +231,7 @@ class BaseSystemMLRegressor(BaseSystemMLEstimator): X: NumPy ndarray, Pandas DataFrame, scipy sparse matrix y: NumPy ndarray, Pandas DataFrame, scipy sparse matrix """ - return sk.metrics.r2_score(y, self.predict(X), multioutput='variance_weighted') + return r2_score(y, self.predict(X), multioutput='variance_weighted') class LogisticRegression(BaseSystemMLClassifier): @@ -439,4 +493,4 @@ class NaiveBayes(BaseSystemMLClassifier): self.estimator = self.sc._jvm.org.apache.sysml.api.ml.NaiveBayes(self.uid, self.sc._jsc.sc()) self.estimator.setLaplace(laplace) self.transferUsingDF = transferUsingDF - self.setOutputRawPredictionsToFalse = False + self.setOutputRawPredictionsToFalse = False \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172a2c51/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala index 539c2c1..d57130e 100644 --- a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala +++ b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala @@ -32,6 +32,7 @@ import org.apache.sysml.runtime.instructions.spark.utils.{ RDDConverterUtilsExt, import org.apache.sysml.api.mlcontext._ import org.apache.sysml.api.mlcontext.ScriptFactory._ import org.apache.spark.sql._ +import org.apache.sysml.api.mlcontext.MLContext.ExplainLevel trait HasLaplace extends Params { final val laplace: Param[Double] = new Param[Double](this, "laplace", "Laplace smoothing specified by the user to avoid creation of 0 probabilities.") @@ -96,49 +97,48 @@ trait BaseSystemMLEstimatorModel { trait BaseSystemMLClassifier extends BaseSystemMLEstimator { - def fit(X_mb: MatrixBlock, y_mb: MatrixBlock, sc: SparkContext): (MLResults, java.util.HashMap[Int, String]) = { + def baseFit(X_mb: MatrixBlock, y_mb: MatrixBlock, sc: SparkContext): MLResults = { val isSingleNode = true val ml = new MLContext(sc) - val revLabelMapping = new java.util.HashMap[Int, String] - PredictionUtils.fillLabelMapping(y_mb, revLabelMapping) y_mb.recomputeNonZeros(); val ret = getTrainingScript(isSingleNode) val script = ret._1.in(ret._2, X_mb).in(ret._3, y_mb) - (ml.execute(script), revLabelMapping) + ml.execute(script) } - def fit(df: ScriptsUtils.SparkDataType, sc: SparkContext): (MLResults, java.util.HashMap[Int, String]) = { + def baseFit(df: ScriptsUtils.SparkDataType, sc: SparkContext): MLResults = { val isSingleNode = false val ml = new MLContext(df.rdd.sparkContext) val mcXin = new MatrixCharacteristics() val Xin = RDDConverterUtils.dataFrameToBinaryBlock(sc, df.asInstanceOf[DataFrame].select("features"), mcXin, false, true) val revLabelMapping = new java.util.HashMap[Int, String] - val yin = PredictionUtils.fillLabelMapping(df, revLabelMapping) + val yin = df.select("label") val ret = getTrainingScript(isSingleNode) val Xbin = new BinaryBlockMatrix(Xin, mcXin) val script = ret._1.in(ret._2, Xbin).in(ret._3, yin) - (ml.execute(script), revLabelMapping) + ml.execute(script) } } trait BaseSystemMLClassifierModel extends BaseSystemMLEstimatorModel { - def transform(X: MatrixBlock, mloutput: MLResults, labelMapping: java.util.HashMap[Int, String], sc: SparkContext, probVar:String): MatrixBlock = { + def baseTransform(X: MatrixBlock, mloutput: MLResults, sc: SparkContext, probVar:String): MatrixBlock = { val isSingleNode = true val ml = new MLContext(sc) val script = getPredictionScript(mloutput, isSingleNode) - val modelPredict = ml.execute(script._1.in(script._2, X)) + // Uncomment for debugging + // ml.setExplainLevel(ExplainLevel.RECOMPILE_RUNTIME) + val modelPredict = ml.execute(script._1.in(script._2, X, new MatrixMetadata(X.getNumRows, X.getNumColumns, X.getNonZeros))) val ret = PredictionUtils.computePredictedClassLabelsFromProbability(modelPredict, isSingleNode, sc, probVar) .getBinaryBlockMatrix("Prediction").getMatrixBlock if(ret.getNumColumns != 1) { throw new RuntimeException("Expected predicted label to be a column vector") } - PredictionUtils.updateLabels(isSingleNode, null, ret, null, labelMapping) return ret } - def transform(df: ScriptsUtils.SparkDataType, mloutput: MLResults, labelMapping: java.util.HashMap[Int, String], sc: SparkContext, + def baseTransform(df: ScriptsUtils.SparkDataType, mloutput: MLResults, sc: SparkContext, probVar:String, outputProb:Boolean=true): DataFrame = { val isSingleNode = false val ml = new MLContext(sc) @@ -148,7 +148,8 @@ trait BaseSystemMLClassifierModel extends BaseSystemMLEstimatorModel { val Xin_bin = new BinaryBlockMatrix(Xin, mcXin) val modelPredict = ml.execute(script._1.in(script._2, Xin_bin)) val predLabelOut = PredictionUtils.computePredictedClassLabelsFromProbability(modelPredict, isSingleNode, sc, probVar) - val predictedDF = PredictionUtils.updateLabels(isSingleNode, predLabelOut.getDataFrame("Prediction"), null, "C1", labelMapping).select(RDDConverterUtils.DF_ID_COLUMN, "prediction") + val predictedDF = predLabelOut.getDataFrame("Prediction").select(RDDConverterUtils.DF_ID_COLUMN, "C1").withColumnRenamed("C1", "prediction") + if(outputProb) { val prob = modelPredict.getDataFrame(probVar, true).withColumnRenamed("C1", "probability").select(RDDConverterUtils.DF_ID_COLUMN, "probability") val dataset = RDDConverterUtilsExt.addIDToDataFrame(df.asInstanceOf[DataFrame], df.sqlContext, RDDConverterUtils.DF_ID_COLUMN) @@ -160,4 +161,4 @@ trait BaseSystemMLClassifierModel extends BaseSystemMLEstimatorModel { } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172a2c51/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala index d0445d2..c47fb3c 100644 --- a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala +++ b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLRegressor.scala @@ -35,7 +35,7 @@ import org.apache.sysml.api.mlcontext.ScriptFactory._ trait BaseSystemMLRegressor extends BaseSystemMLEstimator { - def fit(X_mb: MatrixBlock, y_mb: MatrixBlock, sc: SparkContext): MLResults = { + def baseFit(X_mb: MatrixBlock, y_mb: MatrixBlock, sc: SparkContext): MLResults = { val isSingleNode = true val ml = new MLContext(sc) val ret = getTrainingScript(isSingleNode) @@ -43,7 +43,7 @@ trait BaseSystemMLRegressor extends BaseSystemMLEstimator { ml.execute(script) } - def fit(df: ScriptsUtils.SparkDataType, sc: SparkContext): MLResults = { + def baseFit(df: ScriptsUtils.SparkDataType, sc: SparkContext): MLResults = { val isSingleNode = false val ml = new MLContext(df.rdd.sparkContext) val mcXin = new MatrixCharacteristics() @@ -58,7 +58,7 @@ trait BaseSystemMLRegressor extends BaseSystemMLEstimator { trait BaseSystemMLRegressorModel extends BaseSystemMLEstimatorModel { - def transform(X: MatrixBlock, mloutput: MLResults, sc: SparkContext, predictionVar:String): MatrixBlock = { + def baseTransform(X: MatrixBlock, mloutput: MLResults, sc: SparkContext, predictionVar:String): MatrixBlock = { val isSingleNode = true val ml = new MLContext(sc) val script = getPredictionScript(mloutput, isSingleNode) @@ -71,7 +71,7 @@ trait BaseSystemMLRegressorModel extends BaseSystemMLEstimatorModel { return ret } - def transform(df: ScriptsUtils.SparkDataType, mloutput: MLResults, sc: SparkContext, predictionVar:String): DataFrame = { + def baseTransform(df: ScriptsUtils.SparkDataType, mloutput: MLResults, sc: SparkContext, predictionVar:String): DataFrame = { val isSingleNode = false val ml = new MLContext(sc) val mcXin = new MatrixCharacteristics() http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172a2c51/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala b/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala index cce646d..76bc0a3 100644 --- a/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala +++ b/src/main/scala/org/apache/sysml/api/ml/LinearRegression.scala @@ -73,10 +73,10 @@ class LinearRegression(override val uid: String, val sc: SparkContext, val solve } def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): LinearRegressionModel = - new LinearRegressionModel("lr")(fit(X_mb, y_mb, sc), sc) + new LinearRegressionModel("lr")(baseFit(X_mb, y_mb, sc), sc) def fit(df: ScriptsUtils.SparkDataType): LinearRegressionModel = - new LinearRegressionModel("lr")(fit(df, sc), sc) + new LinearRegressionModel("lr")(baseFit(df, sc), sc) } @@ -90,8 +90,8 @@ class LinearRegressionModel(override val uid: String)(val mloutput: MLResults, v def getPredictionScript(mloutput: MLResults, isSingleNode:Boolean): (Script, String) = PredictionUtils.getGLMPredictionScript(mloutput.getBinaryBlockMatrix("beta_out"), isSingleNode) - def transform(df: ScriptsUtils.SparkDataType): DataFrame = transform(df, mloutput, sc, "means") + def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, mloutput, sc, "means") - def transform(X: MatrixBlock): MatrixBlock = transform(X, mloutput, sc, "means") + def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, mloutput, sc, "means") } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172a2c51/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala b/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala index a9ca6ab..18eadec 100644 --- a/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala +++ b/src/main/scala/org/apache/sysml/api/ml/LogisticRegression.scala @@ -56,13 +56,13 @@ class LogisticRegression(override val uid: String, val sc: SparkContext) extends // Note: will update the y_mb as this will be called by Python mllearn def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): LogisticRegressionModel = { - val ret = fit(X_mb, y_mb, sc) - new LogisticRegressionModel("log")(ret._1, ret._2, sc) + val ret = baseFit(X_mb, y_mb, sc) + new LogisticRegressionModel("log")(ret, sc) } def fit(df: ScriptsUtils.SparkDataType): LogisticRegressionModel = { - val ret = fit(df, sc) - new LogisticRegressionModel("log")(ret._1, ret._2, sc) + val ret = baseFit(df, sc) + new LogisticRegressionModel("log")(ret, sc) } @@ -89,11 +89,11 @@ object LogisticRegressionModel { */ class LogisticRegressionModel(override val uid: String)( - val mloutput: MLResults, val labelMapping: java.util.HashMap[Int, String], val sc: SparkContext) + val mloutput: MLResults, val sc: SparkContext) extends Model[LogisticRegressionModel] with HasIcpt with HasRegParam with HasTol with HasMaxOuterIter with HasMaxInnerIter with BaseSystemMLClassifierModel { override def copy(extra: ParamMap): LogisticRegressionModel = { - val that = new LogisticRegressionModel(uid)(mloutput, labelMapping, sc) + val that = new LogisticRegressionModel(uid)(mloutput, sc) copyValues(that, extra) } var outputRawPredictions = true @@ -102,8 +102,8 @@ class LogisticRegressionModel(override val uid: String)( def getPredictionScript(mloutput: MLResults, isSingleNode:Boolean): (Script, String) = PredictionUtils.getGLMPredictionScript(mloutput.getBinaryBlockMatrix("B_out"), isSingleNode, 3) - def transform(X: MatrixBlock): MatrixBlock = transform(X, mloutput, labelMapping, sc, "means") - def transform(df: ScriptsUtils.SparkDataType): DataFrame = transform(df, mloutput, labelMapping, sc, "means") + def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, mloutput, sc, "means") + def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, mloutput, sc, "means") } /** http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172a2c51/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala b/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala index fd05f27..a7b3a74 100644 --- a/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala +++ b/src/main/scala/org/apache/sysml/api/ml/NaiveBayes.scala @@ -46,13 +46,13 @@ class NaiveBayes(override val uid: String, val sc: SparkContext) extends Estimat // Note: will update the y_mb as this will be called by Python mllearn def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): NaiveBayesModel = { - val ret = fit(X_mb, y_mb, sc) - new NaiveBayesModel("naive")(ret._1, ret._2, sc) + val ret = baseFit(X_mb, y_mb, sc) + new NaiveBayesModel("naive")(ret, sc) } def fit(df: ScriptsUtils.SparkDataType): NaiveBayesModel = { - val ret = fit(df, sc) - new NaiveBayesModel("naive")(ret._1, ret._2, sc) + val ret = baseFit(df, sc) + new NaiveBayesModel("naive")(ret, sc) } def getTrainingScript(isSingleNode:Boolean):(Script, String, String) = { @@ -74,11 +74,11 @@ object NaiveBayesModel { } class NaiveBayesModel(override val uid: String) - (val mloutput: MLResults, val labelMapping: java.util.HashMap[Int, String], val sc: SparkContext) + (val mloutput: MLResults, val sc: SparkContext) extends Model[NaiveBayesModel] with HasLaplace with BaseSystemMLClassifierModel { override def copy(extra: ParamMap): NaiveBayesModel = { - val that = new NaiveBayesModel(uid)(mloutput, labelMapping, sc) + val that = new NaiveBayesModel(uid)(mloutput, sc) copyValues(that, extra) } @@ -103,7 +103,7 @@ class NaiveBayesModel(override val uid: String) (ret, "D") } - def transform(X: MatrixBlock): MatrixBlock = transform(X, mloutput, labelMapping, sc, "probs") - def transform(df: ScriptsUtils.SparkDataType): DataFrame = transform(df, mloutput, labelMapping, sc, "probs") + def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, mloutput, sc, "probs") + def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, mloutput, sc, "probs") } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172a2c51/src/main/scala/org/apache/sysml/api/ml/PredictionUtils.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/PredictionUtils.scala b/src/main/scala/org/apache/sysml/api/ml/PredictionUtils.scala index 0811b1b..585339f 100644 --- a/src/main/scala/org/apache/sysml/api/ml/PredictionUtils.scala +++ b/src/main/scala/org/apache/sysml/api/ml/PredictionUtils.scala @@ -49,87 +49,6 @@ object PredictionUtils { (ret, "X") } - def fillLabelMapping(df: ScriptsUtils.SparkDataType, revLabelMapping: java.util.HashMap[Int, String]): RDD[String] = { - val temp = df.select("label").distinct.rdd.map(_.apply(0).toString).collect() - val labelMapping = new java.util.HashMap[String, Int] - for(i <- 0 until temp.length) { - labelMapping.put(temp(i), i+1) - revLabelMapping.put(i+1, temp(i)) - } - df.select("label").rdd.map( x => labelMapping.get(x.apply(0).toString).toString ) - } - - def fillLabelMapping(y_mb: MatrixBlock, revLabelMapping: java.util.HashMap[Int, String]): Unit = { - val labelMapping = new java.util.HashMap[String, Int] - if(y_mb.getNumColumns != 1) { - throw new RuntimeException("Expected a column vector for y") - } - if(y_mb.isInSparseFormat()) { - throw new DMLRuntimeException("Sparse block is not implemented for fit") - } - else { - val denseBlock = y_mb.getDenseBlock() - var id:Int = 1 - for(i <- 0 until denseBlock.length) { - val v = denseBlock(i).toString() - if(!labelMapping.containsKey(v)) { - labelMapping.put(v, id) - revLabelMapping.put(id, v) - id += 1 - } - denseBlock.update(i, labelMapping.get(v)) - } - } - } - - class LabelMappingData(val labelMapping: java.util.HashMap[Int, String]) extends Serializable { - def mapLabelStr(x:Double):String = { - if(labelMapping.containsKey(x.toInt)) - labelMapping.get(x.toInt) - else - throw new RuntimeException("Incorrect label mapping") - } - def mapLabelDouble(x:Double):Double = { - if(labelMapping.containsKey(x.toInt)) - labelMapping.get(x.toInt).toDouble - else - throw new RuntimeException("Incorrect label mapping") - } - val mapLabel_udf = { - try { - val it = labelMapping.values().iterator() - while(it.hasNext()) { - it.next().toDouble - } - udf(mapLabelDouble _) - } catch { - case e: Exception => udf(mapLabelStr _) - } - } - } - def updateLabels(isSingleNode:Boolean, df:DataFrame, X: MatrixBlock, labelColName:String, labelMapping: java.util.HashMap[Int, String]): DataFrame = { - if(isSingleNode) { - if(X.isInSparseFormat()) { - throw new RuntimeException("Since predicted label is a column vector, expected it to be in dense format") - } - for(i <- 0 until X.getNumRows) { - val v:Int = X.getValue(i, 0).toInt - if(labelMapping.containsKey(v)) { - X.setValue(i, 0, labelMapping.get(v).toDouble) - } - else { - throw new RuntimeException("No mapping found for " + v + " in " + labelMapping.toString()) - } - } - return null - } - else { - val serObj = new LabelMappingData(labelMapping) - return df.withColumn(labelColName, serObj.mapLabel_udf(df(labelColName))) - .withColumnRenamed(labelColName, "prediction") - } - } - def joinUsingID(df1:DataFrame, df2:DataFrame):DataFrame = { df1.join(df2, RDDConverterUtils.DF_ID_COLUMN) } http://git-wip-us.apache.org/repos/asf/incubator-systemml/blob/172a2c51/src/main/scala/org/apache/sysml/api/ml/SVM.scala ---------------------------------------------------------------------- diff --git a/src/main/scala/org/apache/sysml/api/ml/SVM.scala b/src/main/scala/org/apache/sysml/api/ml/SVM.scala index 07a7283..ea24de6 100644 --- a/src/main/scala/org/apache/sysml/api/ml/SVM.scala +++ b/src/main/scala/org/apache/sysml/api/ml/SVM.scala @@ -67,13 +67,13 @@ class SVM (override val uid: String, val sc: SparkContext, val isMultiClass:Bool // Note: will update the y_mb as this will be called by Python mllearn def fit(X_mb: MatrixBlock, y_mb: MatrixBlock): SVMModel = { - val ret = fit(X_mb, y_mb, sc) - new SVMModel("svm")(ret._1, sc, isMultiClass, ret._2) + val ret = baseFit(X_mb, y_mb, sc) + new SVMModel("svm")(ret, sc, isMultiClass) } def fit(df: ScriptsUtils.SparkDataType): SVMModel = { - val ret = fit(df, sc) - new SVMModel("svm")(ret._1, sc, isMultiClass, ret._2) + val ret = baseFit(df, sc) + new SVMModel("svm")(ret, sc, isMultiClass) } } @@ -83,10 +83,10 @@ object SVMModel { final val predictionScriptPathMulticlass = "scripts" + File.separator + "algorithms" + File.separator + "m-svm-predict.dml" } -class SVMModel (override val uid: String)(val mloutput: MLResults, val sc: SparkContext, val isMultiClass:Boolean, - val labelMapping: java.util.HashMap[Int, String]) extends Model[SVMModel] with BaseSystemMLClassifierModel { +class SVMModel (override val uid: String)(val mloutput: MLResults, val sc: SparkContext, val isMultiClass:Boolean) + extends Model[SVMModel] with BaseSystemMLClassifierModel { override def copy(extra: ParamMap): SVMModel = { - val that = new SVMModel(uid)(mloutput, sc, isMultiClass, labelMapping) + val that = new SVMModel(uid)(mloutput, sc, isMultiClass) copyValues(that, extra) } @@ -108,6 +108,6 @@ class SVMModel (override val uid: String)(val mloutput: MLResults, val sc: Spark (ret, "X") } - def transform(X: MatrixBlock): MatrixBlock = transform(X, mloutput, labelMapping, sc, "scores") - def transform(df: ScriptsUtils.SparkDataType): DataFrame = transform(df, mloutput, labelMapping, sc, "scores") + def transform(X: MatrixBlock): MatrixBlock = baseTransform(X, mloutput, sc, "scores") + def transform(df: ScriptsUtils.SparkDataType): DataFrame = baseTransform(df, mloutput, sc, "scores") } \ No newline at end of file