Repository: spark
Updated Branches:
  refs/heads/master 4ac9759f8 -> 95eb06bd7


[SPARK-18438][SPARKR][ML] spark.mlp should support RFormula.

## What changes were proposed in this pull request?
```spark.mlp``` should support ```RFormula``` like other ML algorithm wrappers.
BTW, I did some cleanup and improvement for ```spark.mlp```.

## How was this patch tested?
Unit tests.

Author: Yanbo Liang <yblia...@gmail.com>

Closes #15883 from yanboliang/spark-18438.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/95eb06bd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/95eb06bd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/95eb06bd

Branch: refs/heads/master
Commit: 95eb06bd7d0f7110ef62c8d1cb6337c72b10d99f
Parents: 4ac9759
Author: Yanbo Liang <yblia...@gmail.com>
Authored: Wed Nov 16 01:04:18 2016 -0800
Committer: Yanbo Liang <yblia...@gmail.com>
Committed: Wed Nov 16 01:04:18 2016 -0800

----------------------------------------------------------------------
 R/pkg/R/generics.R                              |  2 +-
 R/pkg/R/mllib.R                                 | 30 ++++++----
 R/pkg/inst/tests/testthat/test_mllib.R          | 63 +++++++++++++-------
 .../MultilayerPerceptronClassifierWrapper.scala | 61 ++++++++++---------
 4 files changed, 96 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/95eb06bd/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 7653ca7..499c7b2 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -1373,7 +1373,7 @@ setGeneric("spark.logit", function(data, formula, ...) { 
standardGeneric("spark.
 
 #' @rdname spark.mlp
 #' @export
-setGeneric("spark.mlp", function(data, ...) { standardGeneric("spark.mlp") })
+setGeneric("spark.mlp", function(data, formula, ...) { 
standardGeneric("spark.mlp") })
 
 #' @rdname spark.naiveBayes
 #' @export

http://git-wip-us.apache.org/repos/asf/spark/blob/95eb06bd/R/pkg/R/mllib.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R
index 1065b4b..265e64e 100644
--- a/R/pkg/R/mllib.R
+++ b/R/pkg/R/mllib.R
@@ -525,7 +525,7 @@ setMethod("write.ml", signature(object = "LDAModel", path = 
"character"),
 #' @note spark.isoreg since 2.1.0
 setMethod("spark.isoreg", signature(data = "SparkDataFrame", formula = 
"formula"),
           function(data, formula, isotonic = TRUE, featureIndex = 0, weightCol 
= NULL) {
-            formula <- paste0(deparse(formula), collapse = "")
+            formula <- paste(deparse(formula), collapse = "")
 
             if (is.null(weightCol)) {
               weightCol <- ""
@@ -775,7 +775,7 @@ setMethod("spark.logit", signature(data = "SparkDataFrame", 
formula = "formula")
                    tol = 1E-6, fitIntercept = TRUE, family = "auto", 
standardization = TRUE,
                    thresholds = 0.5, weightCol = NULL, aggregationDepth = 2,
                    probabilityCol = "probability") {
-            formula <- paste0(deparse(formula), collapse = "")
+            formula <- paste(deparse(formula), collapse = "")
 
             if (is.null(weightCol)) {
               weightCol <- ""
@@ -858,6 +858,8 @@ setMethod("summary", signature(object = 
"LogisticRegressionModel"),
 #'   Multilayer Perceptron}
 #'
 #' @param data a \code{SparkDataFrame} of observations and labels for model 
fitting.
+#' @param formula a symbolic description of the model to be fitted. Currently 
only a few formula
+#'                operators are supported, including '~', '.', ':', '+', and 
'-'.
 #' @param blockSize blockSize parameter.
 #' @param layers integer vector containing the number of nodes for each layer
 #' @param solver solver parameter, supported options: "gd" (minibatch gradient 
descent) or "l-bfgs".
@@ -870,7 +872,7 @@ setMethod("summary", signature(object = 
"LogisticRegressionModel"),
 #' @param ... additional arguments passed to the method.
 #' @return \code{spark.mlp} returns a fitted Multilayer Perceptron 
Classification Model.
 #' @rdname spark.mlp
-#' @aliases spark.mlp,SparkDataFrame-method
+#' @aliases spark.mlp,SparkDataFrame,formula-method
 #' @name spark.mlp
 #' @seealso \link{read.ml}
 #' @export
@@ -879,7 +881,7 @@ setMethod("summary", signature(object = 
"LogisticRegressionModel"),
 #' df <- read.df("data/mllib/sample_multiclass_classification_data.txt", 
source = "libsvm")
 #'
 #' # fit a Multilayer Perceptron Classification Model
-#' model <- spark.mlp(df, blockSize = 128, layers = c(4, 3), solver = "l-bfgs",
+#' model <- spark.mlp(df, label ~ features, blockSize = 128, layers = c(4, 3), 
solver = "l-bfgs",
 #'                    maxIter = 100, tol = 0.5, stepSize = 1, seed = 1,
 #'                    initialWeights = c(0, 0, 0, 0, 0, 5, 5, 5, 5, 5, 9, 9, 
9, 9, 9))
 #'
@@ -896,9 +898,10 @@ setMethod("summary", signature(object = 
"LogisticRegressionModel"),
 #' summary(savedModel)
 #' }
 #' @note spark.mlp since 2.1.0
-setMethod("spark.mlp", signature(data = "SparkDataFrame"),
-          function(data, layers, blockSize = 128, solver = "l-bfgs", maxIter = 
100,
+setMethod("spark.mlp", signature(data = "SparkDataFrame", formula = "formula"),
+          function(data, formula, layers, blockSize = 128, solver = "l-bfgs", 
maxIter = 100,
                    tol = 1E-6, stepSize = 0.03, seed = NULL, initialWeights = 
NULL) {
+            formula <- paste(deparse(formula), collapse = "")
             if (is.null(layers)) {
               stop ("layers must be a integer vector with length > 1.")
             }
@@ -913,7 +916,7 @@ setMethod("spark.mlp", signature(data = "SparkDataFrame"),
               initialWeights <- as.array(as.numeric(na.omit(initialWeights)))
             }
             jobj <- 
callJStatic("org.apache.spark.ml.r.MultilayerPerceptronClassifierWrapper",
-                                "fit", data@sdf, as.integer(blockSize), 
as.array(layers),
+                                "fit", data@sdf, formula, 
as.integer(blockSize), as.array(layers),
                                 as.character(solver), as.integer(maxIter), 
as.numeric(tol),
                                 as.numeric(stepSize), seed, initialWeights)
             new("MultilayerPerceptronClassificationModel", jobj = jobj)
@@ -936,9 +939,10 @@ setMethod("predict", signature(object = 
"MultilayerPerceptronClassificationModel
 # Returns the summary of a Multilayer Perceptron Classification Model produced 
by \code{spark.mlp}
 
 #' @param object a Multilayer Perceptron Classification Model fitted by 
\code{spark.mlp}
-#' @return \code{summary} returns a list containing \code{labelCount}, 
\code{layers}, and
-#'         \code{weights}. For \code{weights}, it is a numeric vector with 
length equal to
-#'         the expected given the architecture (i.e., for 8-10-2 network, 100 
connection weights).
+#' @return \code{summary} returns a list containing \code{numOfInputs}, 
\code{numOfOutputs},
+#'         \code{layers}, and \code{weights}. For \code{weights}, it is a 
numeric vector with
+#'         length equal to the expected given the architecture (i.e., for 
8-10-2 network,
+#'         112 connection weights).
 #' @rdname spark.mlp
 #' @export
 #' @aliases summary,MultilayerPerceptronClassificationModel-method
@@ -946,10 +950,12 @@ setMethod("predict", signature(object = 
"MultilayerPerceptronClassificationModel
 setMethod("summary", signature(object = 
"MultilayerPerceptronClassificationModel"),
           function(object) {
             jobj <- object@jobj
-            labelCount <- callJMethod(jobj, "labelCount")
             layers <- unlist(callJMethod(jobj, "layers"))
+            numOfInputs <- head(layers, n = 1)
+            numOfOutputs <- tail(layers, n = 1)
             weights <- callJMethod(jobj, "weights")
-            list(labelCount = labelCount, layers = layers, weights = weights)
+            list(numOfInputs = numOfInputs, numOfOutputs = numOfOutputs,
+                 layers = layers, weights = weights)
           })
 
 #' Naive Bayes Models

http://git-wip-us.apache.org/repos/asf/spark/blob/95eb06bd/R/pkg/inst/tests/testthat/test_mllib.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_mllib.R 
b/R/pkg/inst/tests/testthat/test_mllib.R
index 07df4b6..2a97a51 100644
--- a/R/pkg/inst/tests/testthat/test_mllib.R
+++ b/R/pkg/inst/tests/testthat/test_mllib.R
@@ -371,12 +371,13 @@ test_that("spark.kmeans", {
 test_that("spark.mlp", {
   df <- 
read.df(absoluteSparkPath("data/mllib/sample_multiclass_classification_data.txt"),
                 source = "libsvm")
-  model <- spark.mlp(df, blockSize = 128, layers = c(4, 5, 4, 3), solver = 
"l-bfgs", maxIter = 100,
-                     tol = 0.5, stepSize = 1, seed = 1)
+  model <- spark.mlp(df, label ~ features, blockSize = 128, layers = c(4, 5, 
4, 3),
+                     solver = "l-bfgs", maxIter = 100, tol = 0.5, stepSize = 
1, seed = 1)
 
   # Test summary method
   summary <- summary(model)
-  expect_equal(summary$labelCount, 3)
+  expect_equal(summary$numOfInputs, 4)
+  expect_equal(summary$numOfOutputs, 3)
   expect_equal(summary$layers, c(4, 5, 4, 3))
   expect_equal(length(summary$weights), 64)
   expect_equal(head(summary$weights, 5), list(-0.878743, 0.2154151, -1.16304, 
-0.6583214, 1.009825),
@@ -385,7 +386,7 @@ test_that("spark.mlp", {
   # Test predict method
   mlpTestDF <- df
   mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
-  expect_equal(head(mlpPredictions$prediction, 6), c(0, 1, 1, 1, 1, 1))
+  expect_equal(head(mlpPredictions$prediction, 6), c("1.0", "0.0", "0.0", 
"0.0", "0.0", "0.0"))
 
   # Test model save/load
   modelPath <- tempfile(pattern = "spark-mlp", fileext = ".tmp")
@@ -395,46 +396,68 @@ test_that("spark.mlp", {
   model2 <- read.ml(modelPath)
   summary2 <- summary(model2)
 
-  expect_equal(summary2$labelCount, 3)
+  expect_equal(summary2$numOfInputs, 4)
+  expect_equal(summary2$numOfOutputs, 3)
   expect_equal(summary2$layers, c(4, 5, 4, 3))
   expect_equal(length(summary2$weights), 64)
 
   unlink(modelPath)
 
   # Test default parameter
-  model <- spark.mlp(df, layers = c(4, 5, 4, 3))
+  model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3))
   mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
-  expect_equal(head(mlpPredictions$prediction, 10), c(1, 1, 1, 1, 0, 1, 2, 2, 
1, 0))
+  expect_equal(head(mlpPredictions$prediction, 10),
+               c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", 
"1.0", "0.0"))
 
   # Test illegal parameter
-  expect_error(spark.mlp(df, layers = NULL), "layers must be a integer vector 
with length > 1.")
-  expect_error(spark.mlp(df, layers = c()), "layers must be a integer vector 
with length > 1.")
-  expect_error(spark.mlp(df, layers = c(3)), "layers must be a integer vector 
with length > 1.")
+  expect_error(spark.mlp(df, label ~ features, layers = NULL),
+               "layers must be a integer vector with length > 1.")
+  expect_error(spark.mlp(df, label ~ features, layers = c()),
+               "layers must be a integer vector with length > 1.")
+  expect_error(spark.mlp(df, label ~ features, layers = c(3)),
+               "layers must be a integer vector with length > 1.")
 
   # Test random seed
   # default seed
-  model <- spark.mlp(df, layers = c(4, 5, 4, 3), maxIter = 10)
+  model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 
10)
   mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
-  expect_equal(head(mlpPredictions$prediction, 12), c(1, 1, 1, 1, 0, 1, 2, 2, 
1, 2, 0, 1))
+  expect_equal(head(mlpPredictions$prediction, 10),
+               c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", 
"1.0", "0.0"))
   # seed equals 10
-  model <- spark.mlp(df, layers = c(4, 5, 4, 3), maxIter = 10, seed = 10)
+  model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 
10, seed = 10)
   mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
-  expect_equal(head(mlpPredictions$prediction, 12), c(1, 1, 1, 1, 2, 1, 2, 2, 
1, 0, 0, 1))
+  expect_equal(head(mlpPredictions$prediction, 10),
+               c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", 
"1.0", "0.0"))
 
   # test initialWeights
-  model <- spark.mlp(df, layers = c(4, 3), maxIter = 2, initialWeights =
+  model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2, 
initialWeights =
     c(0, 0, 0, 0, 0, 5, 5, 5, 5, 5, 9, 9, 9, 9, 9))
   mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
-  expect_equal(head(mlpPredictions$prediction, 12), c(1, 1, 1, 1, 0, 1, 0, 0, 
1, 0, 0, 1))
+  expect_equal(head(mlpPredictions$prediction, 10),
+               c("1.0", "1.0", "1.0", "1.0", "2.0", "1.0", "2.0", "2.0", 
"1.0", "0.0"))
 
-  model <- spark.mlp(df, layers = c(4, 3), maxIter = 2, initialWeights =
+  model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2, 
initialWeights =
     c(0.0, 0.0, 0.0, 0.0, 0.0, 5.0, 5.0, 5.0, 5.0, 5.0, 9.0, 9.0, 9.0, 9.0, 
9.0))
   mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
-  expect_equal(head(mlpPredictions$prediction, 12), c(1, 1, 1, 1, 0, 1, 0, 0, 
1, 0, 0, 1))
+  expect_equal(head(mlpPredictions$prediction, 10),
+               c("1.0", "1.0", "1.0", "1.0", "2.0", "1.0", "2.0", "2.0", 
"1.0", "0.0"))
 
-  model <- spark.mlp(df, layers = c(4, 3), maxIter = 2)
+  model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2)
   mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
-  expect_equal(head(mlpPredictions$prediction, 12), c(1, 1, 1, 1, 0, 1, 0, 2, 
1, 0, 0, 1))
+  expect_equal(head(mlpPredictions$prediction, 10),
+               c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "0.0", "2.0", 
"1.0", "0.0"))
+
+  # Test formula works well
+  df <- suppressWarnings(createDataFrame(iris))
+  model <- spark.mlp(df, Species ~ Sepal_Length + Sepal_Width + Petal_Length + 
Petal_Width,
+                     layers = c(4, 3))
+  summary <- summary(model)
+  expect_equal(summary$numOfInputs, 4)
+  expect_equal(summary$numOfOutputs, 3)
+  expect_equal(summary$layers, c(4, 3))
+  expect_equal(length(summary$weights), 15)
+  expect_equal(head(summary$weights, 5), list(-1.1957257, -5.2693685, 
7.4489734, -6.3751413,
+               -10.2376130), tolerance = 1e-6)
 })
 
 test_that("spark.naiveBayes", {

http://git-wip-us.apache.org/repos/asf/spark/blob/95eb06bd/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
index 2193eb8..d34de30 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
@@ -24,19 +24,29 @@ import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.ml.{Pipeline, PipelineModel}
 import 
org.apache.spark.ml.classification.{MultilayerPerceptronClassificationModel, 
MultilayerPerceptronClassifier}
+import org.apache.spark.ml.feature.{IndexToString, RFormula}
 import org.apache.spark.ml.linalg.Vectors
+import org.apache.spark.ml.r.RWrapperUtils._
 import org.apache.spark.ml.util.{MLReadable, MLReader, MLWritable, MLWriter}
 import org.apache.spark.sql.{DataFrame, Dataset}
 
 private[r] class MultilayerPerceptronClassifierWrapper private (
-    val pipeline: PipelineModel,
-    val labelCount: Long,
-    val layers: Array[Int],
-    val weights: Array[Double]
+    val pipeline: PipelineModel
   ) extends MLWritable {
 
+  import MultilayerPerceptronClassifierWrapper._
+
+  val mlpModel: MultilayerPerceptronClassificationModel =
+    pipeline.stages(1).asInstanceOf[MultilayerPerceptronClassificationModel]
+
+  val weights: Array[Double] = mlpModel.weights.toArray
+  val layers: Array[Int] = mlpModel.layers
+
   def transform(dataset: Dataset[_]): DataFrame = {
     pipeline.transform(dataset)
+      .drop(mlpModel.getFeaturesCol)
+      .drop(mlpModel.getLabelCol)
+      .drop(PREDICTED_LABEL_INDEX_COL)
   }
 
   /**
@@ -49,10 +59,12 @@ private[r] class MultilayerPerceptronClassifierWrapper 
private (
 private[r] object MultilayerPerceptronClassifierWrapper
   extends MLReadable[MultilayerPerceptronClassifierWrapper] {
 
+  val PREDICTED_LABEL_INDEX_COL = "pred_label_idx"
   val PREDICTED_LABEL_COL = "prediction"
 
   def fit(
       data: DataFrame,
+      formula: String,
       blockSize: Int,
       layers: Array[Int],
       solver: String,
@@ -62,8 +74,13 @@ private[r] object MultilayerPerceptronClassifierWrapper
       seed: String,
       initialWeights: Array[Double]
      ): MultilayerPerceptronClassifierWrapper = {
+    val rFormula = new RFormula()
+      .setFormula(formula)
+      .setForceIndexLabel(true)
+    checkDataColumns(rFormula, data)
+    val rFormulaModel = rFormula.fit(data)
     // get labels and feature names from output schema
-    val schema = data.schema
+    val (_, labels) = getFeaturesAndLabels(rFormulaModel, data)
 
     // assemble and fit the pipeline
     val mlp = new MultilayerPerceptronClassifier()
@@ -73,25 +90,25 @@ private[r] object MultilayerPerceptronClassifierWrapper
       .setMaxIter(maxIter)
       .setTol(tol)
       .setStepSize(stepSize)
-      .setPredictionCol(PREDICTED_LABEL_COL)
+      .setFeaturesCol(rFormula.getFeaturesCol)
+      .setLabelCol(rFormula.getLabelCol)
+      .setPredictionCol(PREDICTED_LABEL_INDEX_COL)
     if (seed != null && seed.length > 0) mlp.setSeed(seed.toInt)
     if (initialWeights != null) {
       require(initialWeights.length > 0)
       mlp.setInitialWeights(Vectors.dense(initialWeights))
     }
 
+    val idxToStr = new IndexToString()
+      .setInputCol(PREDICTED_LABEL_INDEX_COL)
+      .setOutputCol(PREDICTED_LABEL_COL)
+      .setLabels(labels)
+
     val pipeline = new Pipeline()
-      .setStages(Array(mlp))
+      .setStages(Array(rFormulaModel, mlp, idxToStr))
       .fit(data)
 
-    val multilayerPerceptronClassificationModel: 
MultilayerPerceptronClassificationModel =
-    pipeline.stages.head.asInstanceOf[MultilayerPerceptronClassificationModel]
-
-    val weights = multilayerPerceptronClassificationModel.weights.toArray
-    val layersFromPipeline = multilayerPerceptronClassificationModel.layers
-    val labelCount = data.select("label").distinct().count()
-
-    new MultilayerPerceptronClassifierWrapper(pipeline, labelCount, 
layersFromPipeline, weights)
+    new MultilayerPerceptronClassifierWrapper(pipeline)
   }
 
   /**
@@ -107,17 +124,10 @@ private[r] object MultilayerPerceptronClassifierWrapper
 
     override def load(path: String): MultilayerPerceptronClassifierWrapper = {
       implicit val format = DefaultFormats
-      val rMetadataPath = new Path(path, "rMetadata").toString
       val pipelinePath = new Path(path, "pipeline").toString
 
-      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
-      val rMetadata = parse(rMetadataStr)
-      val labelCount = (rMetadata \ "labelCount").extract[Long]
-      val layers = (rMetadata \ "layers").extract[Array[Int]]
-      val weights = (rMetadata \ "weights").extract[Array[Double]]
-
       val pipeline = PipelineModel.load(pipelinePath)
-      new MultilayerPerceptronClassifierWrapper(pipeline, labelCount, layers, 
weights)
+      new MultilayerPerceptronClassifierWrapper(pipeline)
     }
   }
 
@@ -128,10 +138,7 @@ private[r] object MultilayerPerceptronClassifierWrapper
       val rMetadataPath = new Path(path, "rMetadata").toString
       val pipelinePath = new Path(path, "pipeline").toString
 
-      val rMetadata = ("class" -> instance.getClass.getName) ~
-        ("labelCount" -> instance.labelCount) ~
-        ("layers" -> instance.layers.toSeq) ~
-        ("weights" -> instance.weights.toArray.toSeq)
+      val rMetadata = "class" -> instance.getClass.getName
       val rMetadataJson: String = compact(render(rMetadata))
       sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to