This is an automated email from the ASF dual-hosted git repository.

niketanpansare pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/systemml.git


The following commit(s) were added to refs/heads/master by this push:
     new 69c6a1a  [SYSTEMML-540] Fixed the failing Python tests and refactored 
BaseSystemMLClassifier class
69c6a1a is described below

commit 69c6a1acb1481b1537bdc850654a4eb0a8efe20b
Author: Niketan Pansare <npan...@us.ibm.com>
AuthorDate: Tue Mar 19 11:40:28 2019 -0700

    [SYSTEMML-540] Fixed the failing Python tests and refactored 
BaseSystemMLClassifier class
    
    - Fixed failing mllearn numpy and df tests.
    - Added a fix in converter util method that converts Spark DF to Pandas. 
This is required as of Spark 2.3+
    - Also, updated the nn tests to match the results of latest Keras/TF 
release, especially the Flatten layer.
    - Added added a warning message when the user attempts to write a metadata 
file with empty name.
---
 .../apache/sysml/runtime/util/MapReduceTool.java   |   4 +
 src/main/python/systemml/mllearn/estimators.py     |   2 +-
 src/main/python/tests/test_mllearn_df.py           |  15 ++-
 src/main/python/tests/test_mllearn_numpy.py        |  44 ++++--
 src/main/python/tests/test_nn_numpy.py             |  27 ++--
 .../sysml/api/ml/BaseSystemMLClassifier.scala      | 148 ++++++++++-----------
 6 files changed, 134 insertions(+), 106 deletions(-)

diff --git a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java 
b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
index cecd0e3..d1f1be5 100644
--- a/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
+++ b/src/main/java/org/apache/sysml/runtime/util/MapReduceTool.java
@@ -422,6 +422,9 @@ public class MapReduceTool
                throws IOException 
        {
                Path path = new Path(mtdfile);
+               if(path.getName().equals(" .mtd")) {
+                       LOG.warn("Performing a write on a empty mtd path:" + 
mtdfile + ". This can lead to unexpected behavior.");
+               }
                FileSystem fs = IOUtilFunctions.getFileSystem(path);
                try( BufferedWriter br = new BufferedWriter(new 
OutputStreamWriter(fs.create(path,true))) ) {
                        String mtd = metaDataToString(vt, schema, dt, mc, 
outinfo, formatProperties);
@@ -429,6 +432,7 @@ public class MapReduceTool
                } catch (Exception e) {
                        throw new IOException("Error creating and writing 
metadata JSON file", e);
                }
+               
        }
 
        public static void writeScalarMetaDataFile(String mtdfile, ValueType 
vt) 
diff --git a/src/main/python/systemml/mllearn/estimators.py 
b/src/main/python/systemml/mllearn/estimators.py
index 2c3b6a2..144cf66 100644
--- a/src/main/python/systemml/mllearn/estimators.py
+++ b/src/main/python/systemml/mllearn/estimators.py
@@ -314,7 +314,7 @@ class BaseSystemMLEstimator(Estimator):
         output: a java-side object (either MatrixBlock or Java DataFrame)
         """
         if isinstance(X, SUPPORTED_TYPES) and self.transferUsingDF:
-            retDF = DataFrame(output, self.sparkSession)
+            retDF = DataFrame(output, self.sparkSession._wrapped)
             retPDF = retDF.sort('__INDEX').select('prediction').toPandas()
             return retPDF.as_matrix().flatten() if isinstance(X, np.ndarray) 
else retPDF
         elif isinstance(X, SUPPORTED_TYPES):
diff --git a/src/main/python/tests/test_mllearn_df.py 
b/src/main/python/tests/test_mllearn_df.py
index c2f8a3e..4f94589 100644
--- a/src/main/python/tests/test_mllearn_df.py
+++ b/src/main/python/tests/test_mllearn_df.py
@@ -24,6 +24,7 @@
 #   - Python 2: `PYSPARK_PYTHON=python2 spark-submit --master local[*] 
--driver-class-path SystemML.jar test_mllearn_df.py`
 #   - Python 3: `PYSPARK_PYTHON=python3 spark-submit --master local[*] 
--driver-class-path SystemML.jar test_mllearn_df.py`
 
+
 # Make the `systemml` package importable
 import os
 import sys
@@ -45,6 +46,16 @@ from systemml.mllearn import LinearRegression, 
LogisticRegression, NaiveBayes, S
 
 sparkSession = SparkSession.builder.getOrCreate()
 
+def test_accuracy_score(sklearn_predicted, mllearn_predicted, y_test, 
threshold):
+    if accuracy_score(sklearn_predicted, mllearn_predicted) > threshold:
+        # Our results match that of scikit-learn. No need to measure with the 
ground truth
+        return True
+    elif accuracy_score(y_test, mllearn_predicted) > accuracy_score(y_test, 
sklearn_predicted):
+        # We perform better than scikit-learn, ignore the threshold
+        return True
+    else:
+        return False
+
 # Currently not integrated with JUnit test
 # ~/spark-1.6.1-scala-2.11/bin/spark-submit --master local[*] 
--driver-class-path SystemML.jar test.py
 class TestMLLearn(unittest.TestCase):
@@ -64,7 +75,7 @@ class TestMLLearn(unittest.TestCase):
         mllearn_predicted = logistic.predict(X_test)
         sklearn_logistic = linear_model.LogisticRegression()
         sklearn_logistic.fit(X_train, y_train)
-        self.failUnless(accuracy_score(sklearn_logistic.predict(X_test), 
mllearn_predicted) > 0.95) # We are comparable to a similar algorithm in scikit 
learn
+        self.failUnless(test_accuracy_score(sklearn_logistic.predict(X_test), 
mllearn_predicted, y_test, 0.95))
 
     def test_linear_regression(self):
         diabetes = datasets.load_diabetes()
@@ -109,7 +120,7 @@ class TestMLLearn(unittest.TestCase):
         from sklearn import linear_model, svm
         clf = svm.LinearSVC()
         sklearn_predicted = clf.fit(X_train, y_train).predict(X_test)
-        self.failUnless(accuracy_score(sklearn_predicted, mllearn_predicted) > 
0.95 )
+        self.failUnless(test_accuracy_score(sklearn_predicted, 
mllearn_predicted, y_test, 0.95))
 
 
 if __name__ == '__main__':
diff --git a/src/main/python/tests/test_mllearn_numpy.py 
b/src/main/python/tests/test_mllearn_numpy.py
index 884dd36..74fd54b 100644
--- a/src/main/python/tests/test_mllearn_numpy.py
+++ b/src/main/python/tests/test_mllearn_numpy.py
@@ -25,7 +25,7 @@
 #   - Python 3: `PYSPARK_PYTHON=python3 spark-submit --master local[*] 
--driver-class-path SystemML.jar test_mllearn_numpy.py`
 
 # Make the `systemml` package importable
-import os
+import os, math
 import sys
 path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "../")
 sys.path.insert(0, path)
@@ -42,6 +42,8 @@ from sklearn.feature_extraction.text import TfidfVectorizer
 from sklearn.metrics import accuracy_score, r2_score
 from systemml.mllearn import LinearRegression, LogisticRegression, NaiveBayes, 
SVM
 from sklearn import linear_model
+from sklearn.datasets import make_classification
+from sklearn.model_selection import train_test_split
 
 sparkSession = SparkSession.builder.getOrCreate()
 
@@ -58,6 +60,23 @@ def deleteIfExists(fileName):
        except OSError:
                pass
 
+def get_classification_data(n_samples=10000, n_features=100, 
n_clusters_per_class=1, n_classes=10):
+    n_informative = int(math.log(n_classes * n_clusters_per_class, 2)) + 1
+    X, y = make_classification(n_samples=n_samples, n_features=n_features, 
n_redundant=0, n_informative=n_informative, random_state=1,
+                               n_clusters_per_class=n_clusters_per_class, 
n_classes=n_classes)
+    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, 
random_state=42)
+    return X_train, X_test, y_train, y_test
+
+def test_accuracy_score(sklearn_predicted, mllearn_predicted, y_test, 
threshold):
+    if accuracy_score(sklearn_predicted, mllearn_predicted) > threshold:
+        # Our results match that of scikit-learn. No need to measure with the 
ground truth
+        return True
+    elif accuracy_score(y_test, mllearn_predicted) > accuracy_score(y_test, 
sklearn_predicted):
+        # We perform better than scikit-learn, ignore the threshold
+        return True
+    else:
+        return False
+
 # Currently not integrated with JUnit test
 # ~/spark-1.6.1-scala-2.11/bin/spark-submit --master local[*] 
--driver-class-path SystemML.jar test.py
 class TestMLLearn(unittest.TestCase):
@@ -75,8 +94,17 @@ class TestMLLearn(unittest.TestCase):
         mllearn_predicted = logistic.predict(X_test)
         sklearn_logistic = linear_model.LogisticRegression()
         sklearn_logistic.fit(X_train, y_train)
-        self.failUnless(accuracy_score(sklearn_logistic.predict(X_test), 
mllearn_predicted) > 0.95) # We are comparable to a similar algorithm in scikit 
learn
-    
+        self.failUnless(test_accuracy_score(sklearn_logistic.predict(X_test), 
mllearn_predicted, y_test, 0.95))
+
+    def test_logistic_random_data(self):
+        X_train, X_test, y_train, y_test = get_classification_data(n_classes=2)
+        logistic = LogisticRegression(sparkSession)
+        logistic.fit(X_train, y_train)
+        mllearn_predicted = logistic.predict(X_test)
+        sklearn_logistic = linear_model.LogisticRegression()
+        sklearn_logistic.fit(X_train, y_train)
+        self.failUnless(test_accuracy_score(sklearn_logistic.predict(X_test), 
mllearn_predicted, y_test, 0.95))
+
     def test_logistic_mlpipeline(self):
         training = sparkSession.createDataFrame([
             ("a b c d e spark", 1.0),
@@ -148,12 +176,10 @@ class TestMLLearn(unittest.TestCase):
         y_test = y_digits[int(.9 * n_samples):]
         svm = SVM(sparkSession, is_multi_class=True, tol=0.0001)
         mllearn_predicted = svm.fit(X_train, y_train).predict(X_test)
-        from sklearn import linear_model, svm
+        from sklearn import svm
         clf = svm.LinearSVC()
         sklearn_predicted = clf.fit(X_train, y_train).predict(X_test)
-        accuracy = accuracy_score(sklearn_predicted, mllearn_predicted)
-        evaluation = 'test_svm accuracy_score(sklearn_predicted, 
mllearn_predicted) was {}'.format(accuracy)
-        self.failUnless(accuracy > 0.95, evaluation)
+        self.failUnless(test_accuracy_score(sklearn_predicted, 
mllearn_predicted, y_test, 0.95))
 
     def test_naive_bayes(self):
         digits = datasets.load_digits()
@@ -169,8 +195,8 @@ class TestMLLearn(unittest.TestCase):
         from sklearn.naive_bayes import MultinomialNB
         clf = MultinomialNB()
         sklearn_predicted = clf.fit(X_train, y_train).predict(X_test)
-        self.failUnless(accuracy_score(sklearn_predicted, mllearn_predicted) > 
0.95 )
-        
+        self.failUnless(test_accuracy_score(sklearn_predicted, 
mllearn_predicted, y_test, 0.95))
+
     def test_naive_bayes1(self):
         categories = ['alt.atheism', 'talk.religion.misc', 'comp.graphics', 
'sci.space']
         newsgroups_train = fetch_20newsgroups(subset='train', 
categories=categories)
diff --git a/src/main/python/tests/test_nn_numpy.py 
b/src/main/python/tests/test_nn_numpy.py
index 76d9619..80d3151 100644
--- a/src/main/python/tests/test_nn_numpy.py
+++ b/src/main/python/tests/test_nn_numpy.py
@@ -36,6 +36,8 @@ import sys
 path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "../")
 sys.path.insert(0, path)
 
+TEST_GPU = False
+
 import unittest
 
 import numpy as np
@@ -48,20 +50,17 @@ from systemml.mllearn import Keras2DML
 from pyspark.sql import SparkSession
 from pyspark import SparkContext
 from keras.utils import np_utils
-from scipy import stats
-from sklearn.preprocessing import normalize
 from operator import mul
 
 batch_size = 32
 K.set_image_data_format('channels_first')
-# K.set_image_dim_ordering("th")
 
-def get_tensor(shape, random=True):
+def get_tensor(shape):
     if shape[0] is None:
         # Use the first dimension is None, use batch size:
         shape = list(shape)
         shape[0] = batch_size
-    return (np.random.randint(100, size=shape) + 1) / 100
+    return (np.random.randint(1000, size=shape).astype(np.float32) + 1) / 1000
 
 tmp_dir = 'tmp_dir'
 
@@ -94,10 +93,11 @@ def get_sysml_model(keras_model):
     # By performing one-hot encoding outside, we ensure that the ordering of 
the TF columns
     # matches that of SystemML
     sysml_model.set(train_algo='batch', perform_one_hot_encoding=False)
+    sysml_model.setGPU(TEST_GPU)
     # print('Script:' + str(sysml_model.get_training_script()))
     return sysml_model
 
-def base_test(layers, add_dense=False, test_backward=True):
+def base_test(layers, add_dense=False, test_backward=True, 
reshuffle_keras_output=False):
     layers = [layers] if not isinstance(layers, list) else layers
     in_shape, output_shape = get_input_output_shape(layers)
     # --------------------------------------
@@ -118,8 +118,8 @@ def base_test(layers, add_dense=False, test_backward=True):
     sysml_model = get_sysml_model(keras_model)
     keras_tensor = get_tensor(in_shape)
     sysml_matrix = keras_tensor.reshape((batch_size, -1))
-    #if len(keras_tensor.shape) == 4:
-    #    keras_tensor = np.flip(keras_tensor, 1)
+    # if len(keras_tensor.shape) == 4:
+    #     keras_tensor = np.flip(keras_tensor, 1)
     # --------------------------------------
     sysml_preds = sysml_model.predict_proba(sysml_matrix)
     if test_backward:
@@ -131,13 +131,14 @@ def base_test(layers, add_dense=False, 
test_backward=True):
         keras_model.train_on_batch(keras_tensor, one_hot_labels)
         keras_preds = keras_model.predict(keras_tensor)
     # --------------------------------------
-    if len(output_shape) == 4:
+    if len(output_shape) > 4:
+        raise Exception('Unsupported output shape:' + str(output_shape))
+    if len(output_shape) == 4 and reshuffle_keras_output:
+        # This is not required as of Keras 2.1.5 and Tensorflow 1.11.0, but 
keeping it for backward compatibility.
         # Flatten doesnot respect channel_first, so reshuffle the dimensions:
         keras_preds = keras_preds.reshape((batch_size, output_shape[2], 
output_shape[3], output_shape[1]))
         keras_preds = np.swapaxes(keras_preds, 2, 3)  # (h,w,c) -> (h,c,w)
         keras_preds = np.swapaxes(keras_preds, 1, 2)  # (h,c,w) -> (c,h,w)
-    elif len(output_shape) > 4:
-        raise Exception('Unsupported output shape:' + str(output_shape))
     # --------------------------------------
     return sysml_preds, keras_preds, keras_model, output_shape
 
@@ -179,8 +180,8 @@ class TestNNLibrary(unittest.TestCase):
     def test_lstm_forward1(self):
         self.failUnless(test_forward(LSTM(2, return_sequences=True, 
activation='tanh', stateful=False, recurrent_activation='sigmoid', 
input_shape=(3, 4))))
 
-    #def test_lstm_backward1(self):
-    #    self.failUnless(test_backward(LSTM(2, return_sequences=True, 
activation='tanh', stateful=False, recurrent_activation='sigmoid',  
input_shape=(3, 4))))
+    def test_lstm_backward1(self):
+       self.failUnless(test_backward(LSTM(2, return_sequences=True, 
activation='tanh', stateful=False, recurrent_activation='sigmoid',  
input_shape=(3, 4))))
 
     def test_lstm_forward2(self):
         self.failUnless(test_forward(LSTM(10, return_sequences=False, 
activation='tanh', stateful=False, recurrent_activation='sigmoid', 
input_shape=(30, 20))))
diff --git 
a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala 
b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala
index c1146d1..c46310d 100644
--- a/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala
+++ b/src/main/scala/org/apache/sysml/api/ml/BaseSystemMLClassifier.scala
@@ -255,95 +255,84 @@ trait BaseSystemMLClassifier extends 
BaseSystemMLEstimator {
   }
 }
 
-trait BaseSystemMLClassifierModel extends BaseSystemMLEstimatorModel {
-
-  def baseTransform(X_file: String, sc: SparkContext, probVar: String): String 
= baseTransform(X_file, sc, probVar, -1, 1, 1)
-  def baseTransform(X: MatrixBlock, sc: SparkContext, probVar: String): 
MatrixBlock = baseTransform(X, sc, probVar, -1, 1, 1)
- 
-  def baseTransform(X: String, sc: SparkContext, probVar: String, C: Int, H: 
Int, W: Int): String = {
-    val Prob = baseTransformHelper(X, sc, probVar, C, H, W)
-    val script1 = dml("source(\"nn/util.dml\") as util; Prediction = 
util::predict_class(Prob, C, H, W); " + dmlWrite("Prediction"))
-      .in("Prob", Prob)
-      .in("C", C)
-      .in("H", H)
-      .in("W", W)
-    val ml           = new MLContext(sc)
-    updateML(ml)
-    ml.execute(script1)
-    return "output.mtx"
-  }
-
-  def baseTransformHelper(X_file: String, sc: SparkContext, probVar: String, 
C: Int, H: Int, W: Int): Matrix = {
-    val isSingleNode = true
-    val ml           = new MLContext(sc)
-    updateML(ml)
-    val readScript      = dml(dmlRead("X", X_file)).out("X")
-         val res = ml.execute(readScript)
-    val script = getPredictionScript(isSingleNode)
-    val modelPredict = ml.execute(script._1.in(script._2, res.getMatrix("X")))
-    return modelPredict.getMatrix(probVar)
-  }
-  
-  def replacePredictionWithProb(script: (Script, String), probVar: String, C: 
Int, H: Int, W: Int): Unit = {
-    // Append prediction code:
-    val newDML = "source(\"nn/util.dml\") as util;\n" + 
-      script._1.getScriptString + 
-      "\nPrediction = util::predict_class(" + probVar + ", " + C + ", " + H + 
", " + W + ");"
-    script._1.setScriptString(newDML)
-    
-    // Modify the output variables -> remove probability matrix and add 
Prediction
-    val outputVariables = new 
java.util.HashSet[String](script._1.getOutputVariables)
-    outputVariables.remove(probVar)
-    outputVariables.add("Prediction")
-    script._1.clearOutputs()
-    script._1.out(outputVariables.toList)
-  }
-
-  def baseTransform(X: MatrixBlock, sc: SparkContext, probVar: String, C: Int, 
H: Int, W: Int): MatrixBlock = {
-    val isSingleNode = true
-    val ml           = new MLContext(sc)
+trait BaseSystemMLClassifierModel extends BaseSystemMLEstimatorModel {  
+  // Helper method that executes the prediction script:
+  def executePredictionScript(sc: SparkContext, C: Int, H: Int, W: Int, 
isSingleNode:Boolean, 
+      outputProbability:Boolean, probVar:String, 
+      addInputOutput: (Script, String) => Script): Matrix = {
+    val ml = new MLContext(sc)
     updateML(ml)
+    // getPredictionScript sets the hyperparameter as well as the output 
parameter 
     val script = getPredictionScript(isSingleNode)
-    
-    replacePredictionWithProb(script, probVar, C, H, W)
-    
-    // Now execute the prediction script directly
-    val ret = ml.execute(script._1.in(script._2, X, new 
MatrixMetadata(X.getNumRows, X.getNumColumns, X.getNonZeros)))
-              .getMatrix("Prediction").toMatrixBlock
-              
-    if (ret.getNumColumns != 1 && H == 1 && W == 1) {
-      throw new RuntimeException("Expected predicted label to be a column 
vector")
+    if(!outputProbability) {
+      // Append prediction code:
+      val newDML = if(H == 1 && W == 1) {
+        "source(\"nn/util.dml\") as util;\n" +
+        script._1.getScriptString + 
+        "\nPrediction = util::predict_class(" + probVar + ", " + C + ", " + H 
+ ", " + W + ");\n"
+      } else {
+        "\nPrediction = rowIndexMax(" + probVar + ");\n" // predictions are 
1-based
+      }
+      script._1.setScriptString(newDML)
+      // Modify the output variables -> remove probability matrix and add 
Prediction
+      val outputVariables = new 
java.util.HashSet[String](script._1.getOutputVariables)
+      // Register probVar as output as well to avoid writing of empty 
meta-data files in scripts like Naive Bayes
+      // outputVariables.remove(probVar)
+      outputVariables.add("Prediction")
+      script._1.clearOutputs()
+      script._1.out(outputVariables.toList)
     }
-    return ret
+    val modelPredict = ml.execute(addInputOutput(script._1, script._2))
+    return modelPredict.getMatrix(if(outputProbability) probVar else 
"Prediction")
   }
-
-  def baseTransformHelper(X: MatrixBlock, sc: SparkContext, probVar: String, 
C: Int, H: Int, W: Int): Matrix = {
-    val isSingleNode = true
-    val ml           = new MLContext(sc)
-    updateML(ml)
-    val script = getPredictionScript(isSingleNode)
-    val modelPredict = ml.execute(script._1.in(script._2, X, new 
MatrixMetadata(X.getNumRows, X.getNumColumns, X.getNonZeros)))
-    return modelPredict.getMatrix(probVar)
+  // 
--------------------------------------------------------------------------------------------------------------
+  // Methods where the input and output probability/predictions are 
MatrixBlock.
+  def baseTransformHelper(X: MatrixBlock, sc: SparkContext, probVar: String, 
C: Int, H: Int, W: Int, outputProbability:Boolean): MatrixBlock = {
+    val addInputOutput = (script:Script, xVar: String) => {
+      script.in(xVar, X, new MatrixMetadata(X.getNumRows, X.getNumColumns, 
X.getNonZeros))
+    }
+    return executePredictionScript(sc, C, H, W, true, outputProbability, 
probVar, addInputOutput).toMatrixBlock
   }
-
+  // Methods that return probabilities:
   def baseTransformProbability(X: MatrixBlock, sc: SparkContext, probVar: 
String): MatrixBlock =
     baseTransformProbability(X, sc, probVar, -1, 1, 1)
-
-  def baseTransformProbability(X: String, sc: SparkContext, probVar: String): 
String =
-    baseTransformProbability(X, sc, probVar, -1, 1, 1)
-    
-  def baseTransformProbability(X: MatrixBlock, sc: SparkContext, probVar: 
String, C: Int, H: Int, W: Int): MatrixBlock =
-    return baseTransformHelper(X, sc, probVar, C, H, W).toMatrixBlock
-
-  def baseTransformProbability(X: String, sc: SparkContext, probVar: String, 
C: Int, H: Int, W: Int): String = {
-       val Prob =  baseTransformHelper(X, sc, probVar, C, H, W)
-    (new MLContext(sc)).execute(dml(dmlWrite("Prob")).in("Prob", Prob))
+  def baseTransformProbability(X: MatrixBlock, sc: SparkContext, probVar: 
String, C: Int, H: Int, W: Int): MatrixBlock = 
+    baseTransformHelper(X, sc, probVar, C, H, W, true)
+  // Methods that return predictions:
+  def baseTransform(X: MatrixBlock, sc: SparkContext, probVar: String): 
MatrixBlock = 
+    baseTransform(X, sc, probVar, -1, 1, 1)
+  def baseTransform(X: MatrixBlock, sc: SparkContext, probVar: String, C: Int, 
H: Int, W: Int): MatrixBlock = 
+    baseTransformHelper(X, sc, probVar, C, H, W, false)
+  // 
--------------------------------------------------------------------------------------------------------------
+  // Methods where the input is a file path and output probability/predictions 
are returned as a file path.
+  def baseTransformHelper(X: String, sc: SparkContext, probVar: String, C: 
Int, H: Int, W: Int, outputProbability:Boolean): String = {
+    val ml = new MLContext(sc)
+    updateML(ml)
+    val addInputOutput = (script:Script, xVar: String) => {
+      // Execution 1: Read X from the file system using MLContext
+      val readScript = dml(dmlRead("X", X)).out("X")
+      script.in(xVar, ml.execute(dml(dmlRead("X", X)).out("X")).getMatrix("X"))
+    }
+    // Execution 2: Execute the prediction script
+    val Prob =  executePredictionScript(sc, C, H, W, true, outputProbability, 
probVar, addInputOutput)
+    // Execution 3: Execute the write script to dump the matrix Prob  
+    ml.execute(dml(dmlWrite("Prob")).in("Prob", Prob))
     "output.mtx"
   }
-                       
+  // Methods that return probabilities:
+  def baseTransformProbability(X: String, sc: SparkContext, probVar: String): 
String =
+    baseTransformProbability(X, sc, probVar, -1, 1, 1)
+  def baseTransformProbability(X: String, sc: SparkContext, probVar: String, 
C: Int, H: Int, W: Int): String = 
+    baseTransformHelper(X, sc, probVar, C, H, W, true)
+  // Methods that return predictions:
+  def baseTransform(X_file: String, sc: SparkContext, probVar: String): String 
= 
+    baseTransform(X_file, sc, probVar, -1, 1, 1)
+  def baseTransform(X: String, sc: SparkContext, probVar: String, C: Int, H: 
Int, W: Int): String = 
+    baseTransformHelper(X, sc, probVar, C, H, W, false)
+  // 
--------------------------------------------------------------------------------------------------------------
+  // Methods where the input and output probability/predictions are DataFrame.
   def baseTransform(df: ScriptsUtils.SparkDataType, sc: SparkContext, probVar: 
String, outputProb: Boolean = true): DataFrame =
     baseTransform(df, sc, probVar, outputProb, -1, 1, 1)
-
   def baseTransformHelper(df: ScriptsUtils.SparkDataType, sc: SparkContext, 
probVar: String, outputProb: Boolean, C: Int, H: Int, W: Int): Matrix = {
     val isSingleNode = false
     val ml           = new MLContext(sc)
@@ -356,7 +345,6 @@ trait BaseSystemMLClassifierModel extends 
BaseSystemMLEstimatorModel {
     val modelPredict = ml.execute(script._1.in(script._2, Xin_bin))
     return modelPredict.getMatrix(probVar)
   }
-
   def baseTransform(df: ScriptsUtils.SparkDataType, sc: SparkContext, probVar: 
String, outputProb: Boolean, C: Int, H: Int, W: Int): DataFrame = {
     val Prob = baseTransformHelper(df, sc, probVar, outputProb, C, H, W)
     val script1 = dml("source(\"nn/util.dml\") as util; Prediction = 
util::predict_class(Prob, C, H, W);")
@@ -367,7 +355,6 @@ trait BaseSystemMLClassifierModel extends 
BaseSystemMLEstimatorModel {
       .in("W", W)
     val predLabelOut = (new MLContext(sc)).execute(script1)
     val predictedDF  = 
predLabelOut.getDataFrame("Prediction").select(RDDConverterUtils.DF_ID_COLUMN, 
"C1").withColumnRenamed("C1", "prediction")
-
     if (outputProb) {
       val prob    = Prob.toDFVectorWithIDColumn().withColumnRenamed("C1", 
"probability").select(RDDConverterUtils.DF_ID_COLUMN, "probability")
       val dataset = 
RDDConverterUtilsExt.addIDToDataFrame(df.asInstanceOf[DataFrame], 
df.sparkSession, RDDConverterUtils.DF_ID_COLUMN)
@@ -376,6 +363,5 @@ trait BaseSystemMLClassifierModel extends 
BaseSystemMLEstimatorModel {
       val dataset = 
RDDConverterUtilsExt.addIDToDataFrame(df.asInstanceOf[DataFrame], 
df.sparkSession, RDDConverterUtils.DF_ID_COLUMN)
       return PredictionUtils.joinUsingID(dataset, predictedDF)
     }
-
   }
 }

Reply via email to