Repository: spark
Updated Branches:
  refs/heads/master d2932a0e9 -> 2fbdb6063


[SPARK-16445][MLLIB][SPARKR] Multilayer Perceptron Classifier wrapper in SparkR

https://issues.apache.org/jira/browse/SPARK-16445

## What changes were proposed in this pull request?

Create Multilayer Perceptron Classifier wrapper in SparkR

## How was this patch tested?

Tested manually on local machine

Author: Xin Ren <iamsh...@126.com>

Closes #14447 from keypointt/SPARK-16445.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2fbdb606
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2fbdb606
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2fbdb606

Branch: refs/heads/master
Commit: 2fbdb606392631b1dff88ec86f388cc2559c28f5
Parents: d2932a0
Author: Xin Ren <iamsh...@126.com>
Authored: Wed Aug 24 11:18:10 2016 -0700
Committer: Felix Cheung <felixche...@apache.org>
Committed: Wed Aug 24 11:18:10 2016 -0700

----------------------------------------------------------------------
 R/pkg/NAMESPACE                                 |   1 +
 R/pkg/R/generics.R                              |   4 +
 R/pkg/R/mllib.R                                 | 125 ++++++++++++++++-
 R/pkg/inst/tests/testthat/test_mllib.R          |  32 +++++
 .../MultilayerPerceptronClassifierWrapper.scala | 134 +++++++++++++++++++
 .../scala/org/apache/spark/ml/r/RWrappers.scala |   2 +
 6 files changed, 293 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2fbdb606/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 7090576..ad587a6 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -27,6 +27,7 @@ exportMethods("glm",
               "summary",
               "spark.kmeans",
               "fitted",
+              "spark.mlp",
               "spark.naiveBayes",
               "spark.survreg",
               "spark.lda",

http://git-wip-us.apache.org/repos/asf/spark/blob/2fbdb606/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 88884e6..7e626be 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -1330,6 +1330,10 @@ setGeneric("spark.kmeans", function(data, formula, ...) 
{ standardGeneric("spark
 #' @export
 setGeneric("fitted")
 
+#' @rdname spark.mlp
+#' @export
+setGeneric("spark.mlp", function(data, ...) { standardGeneric("spark.mlp") })
+
 #' @rdname spark.naiveBayes
 #' @export
 setGeneric("spark.naiveBayes", function(data, formula, ...) { 
standardGeneric("spark.naiveBayes") })

http://git-wip-us.apache.org/repos/asf/spark/blob/2fbdb606/R/pkg/R/mllib.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R
index a40310d..a670600 100644
--- a/R/pkg/R/mllib.R
+++ b/R/pkg/R/mllib.R
@@ -60,6 +60,13 @@ setClass("AFTSurvivalRegressionModel", representation(jobj = 
"jobj"))
 #' @note KMeansModel since 2.0.0
 setClass("KMeansModel", representation(jobj = "jobj"))
 
+#' S4 class that represents a MultilayerPerceptronClassificationModel
+#'
+#' @param jobj a Java object reference to the backing Scala 
MultilayerPerceptronClassifierWrapper
+#' @export
+#' @note MultilayerPerceptronClassificationModel since 2.1.0
+setClass("MultilayerPerceptronClassificationModel", representation(jobj = 
"jobj"))
+
 #' S4 class that represents an IsotonicRegressionModel
 #'
 #' @param jobj a Java object reference to the backing Scala 
IsotonicRegressionModel
@@ -90,7 +97,7 @@ setClass("ALSModel", representation(jobj = "jobj"))
 #' @export
 #' @seealso \link{spark.glm}, \link{glm},
 #' @seealso \link{spark.als}, \link{spark.gaussianMixture}, 
\link{spark.isoreg}, \link{spark.kmeans},
-#' @seealso \link{spark.lda}, \link{spark.naiveBayes}, \link{spark.survreg},
+#' @seealso \link{spark.lda}, \link{spark.mlp}, \link{spark.naiveBayes}, 
\link{spark.survreg}
 #' @seealso \link{read.ml}
 NULL
 
@@ -103,7 +110,7 @@ NULL
 #' @export
 #' @seealso \link{spark.glm}, \link{glm},
 #' @seealso \link{spark.als}, \link{spark.gaussianMixture}, 
\link{spark.isoreg}, \link{spark.kmeans},
-#' @seealso \link{spark.naiveBayes}, \link{spark.survreg},
+#' @seealso \link{spark.mlp}, \link{spark.naiveBayes}, \link{spark.survreg}
 NULL
 
 write_internal <- function(object, path, overwrite = FALSE) {
@@ -631,6 +638,95 @@ setMethod("predict", signature(object = "KMeansModel"),
             predict_internal(object, newData)
           })
 
+#' Multilayer Perceptron Classification Model
+#'
+#' \code{spark.mlp} fits a multi-layer perceptron neural network model against 
a SparkDataFrame.
+#' Users can call \code{summary} to print a summary of the fitted model, 
\code{predict} to make
+#' predictions on new data, and \code{write.ml}/\code{read.ml} to save/load 
fitted models.
+#' Only categorical data is supported.
+#' For more details, see
+#' 
\href{http://spark.apache.org/docs/latest/ml-classification-regression.html}{
+#'   Multilayer Perceptron}
+#'
+#' @param data a \code{SparkDataFrame} of observations and labels for model 
fitting.
+#' @param blockSize blockSize parameter.
+#' @param layers integer vector containing the number of nodes for each layer
+#' @param solver solver parameter, supported options: "gd" (minibatch gradient 
descent) or "l-bfgs".
+#' @param maxIter maximum iteration number.
+#' @param tol convergence tolerance of iterations.
+#' @param stepSize stepSize parameter.
+#' @param seed seed parameter for weights initialization.
+#' @param ... additional arguments passed to the method.
+#' @return \code{spark.mlp} returns a fitted Multilayer Perceptron 
Classification Model.
+#' @rdname spark.mlp
+#' @aliases spark.mlp,SparkDataFrame-method
+#' @name spark.mlp
+#' @seealso \link{read.ml}
+#' @export
+#' @examples
+#' \dontrun{
+#' df <- read.df("data/mllib/sample_multiclass_classification_data.txt", 
source = "libsvm")
+#'
+#' # fit a Multilayer Perceptron Classification Model
+#' model <- spark.mlp(df, blockSize = 128, layers = c(4, 5, 4, 3), solver = 
"l-bfgs",
+#'                    maxIter = 100, tol = 0.5, stepSize = 1, seed = 1)
+#'
+#' # get the summary of the model
+#' summary(model)
+#'
+#' # make predictions
+#' predictions <- predict(model, df)
+#'
+#' # save and load the model
+#' path <- "path/to/model"
+#' write.ml(model, path)
+#' savedModel <- read.ml(path)
+#' summary(savedModel)
+#' }
+#' @note spark.mlp since 2.1.0
+setMethod("spark.mlp", signature(data = "SparkDataFrame"),
+          function(data, blockSize = 128, layers = c(3, 5, 2), solver = 
"l-bfgs", maxIter = 100,
+                   tol = 0.5, stepSize = 1, seed = 1) {
+            jobj <- 
callJStatic("org.apache.spark.ml.r.MultilayerPerceptronClassifierWrapper",
+                                "fit", data@sdf, as.integer(blockSize), 
as.array(layers),
+                                as.character(solver), as.integer(maxIter), 
as.numeric(tol),
+                                as.numeric(stepSize), as.integer(seed))
+            new("MultilayerPerceptronClassificationModel", jobj = jobj)
+          })
+
+# Makes predictions from a model produced by spark.mlp().
+
+#' @param newData a SparkDataFrame for testing.
+#' @return \code{predict} returns a SparkDataFrame containing predicted 
labeled in a column named
+#' "prediction".
+#' @rdname spark.mlp
+#' @aliases predict,MultilayerPerceptronClassificationModel-method
+#' @export
+#' @note predict(MultilayerPerceptronClassificationModel) since 2.1.0
+setMethod("predict", signature(object = 
"MultilayerPerceptronClassificationModel"),
+          function(object, newData) {
+            predict_internal(object, newData)
+          })
+
+# Returns the summary of a Multilayer Perceptron Classification Model produced 
by \code{spark.mlp}
+
+#' @param object a Multilayer Perceptron Classification Model fitted by 
\code{spark.mlp}
+#' @return \code{summary} returns a list containing \code{layers}, the label 
distribution, and
+#'         \code{tables}, conditional probabilities given the target label.
+#' @rdname spark.mlp
+#' @export
+#' @aliases summary,MultilayerPerceptronClassificationModel-method
+#' @note summary(MultilayerPerceptronClassificationModel) since 2.1.0
+setMethod("summary", signature(object = 
"MultilayerPerceptronClassificationModel"),
+          function(object) {
+            jobj <- object@jobj
+            labelCount <- callJMethod(jobj, "labelCount")
+            layers <- unlist(callJMethod(jobj, "layers"))
+            weights <- callJMethod(jobj, "weights")
+            weights <- matrix(weights, nrow = length(weights))
+            list(labelCount = labelCount, layers = layers, weights = weights)
+          })
+
 #' Naive Bayes Models
 #'
 #' \code{spark.naiveBayes} fits a Bernoulli naive Bayes model against a 
SparkDataFrame.
@@ -685,7 +781,7 @@ setMethod("spark.naiveBayes", signature(data = 
"SparkDataFrame", formula = "form
 #'
 #' @rdname spark.naiveBayes
 #' @export
-#' @seealso \link{read.ml}
+#' @seealso \link{write.ml}
 #' @note write.ml(NaiveBayesModel, character) since 2.0.0
 setMethod("write.ml", signature(object = "NaiveBayesModel", path = 
"character"),
           function(object, path, overwrite = FALSE) {
@@ -700,7 +796,7 @@ setMethod("write.ml", signature(object = "NaiveBayesModel", 
path = "character"),
 #' @rdname spark.survreg
 #' @export
 #' @note write.ml(AFTSurvivalRegressionModel, character) since 2.0.0
-#' @seealso \link{read.ml}
+#' @seealso \link{write.ml}
 setMethod("write.ml", signature(object = "AFTSurvivalRegressionModel", path = 
"character"),
           function(object, path, overwrite = FALSE) {
             write_internal(object, path, overwrite)
@@ -734,6 +830,23 @@ setMethod("write.ml", signature(object = "KMeansModel", 
path = "character"),
             write_internal(object, path, overwrite)
           })
 
+# Saves the Multilayer Perceptron Classification Model to the input path.
+
+#' @param path the directory where the model is saved.
+#' @param overwrite overwrites or not if the output path already exists. 
Default is FALSE
+#'                  which means throw exception if the output path exists.
+#'
+#' @rdname spark.mlp
+#' @aliases write.ml,MultilayerPerceptronClassificationModel,character-method
+#' @export
+#' @seealso \link{write.ml}
+#' @note write.ml(MultilayerPerceptronClassificationModel, character) since 
2.1.0
+setMethod("write.ml", signature(object = 
"MultilayerPerceptronClassificationModel",
+          path = "character"),
+          function(object, path, overwrite = FALSE) {
+            write_internal(object, path, overwrite)
+          })
+
 #  Save fitted IsotonicRegressionModel to the input path
 
 #' @param path The directory where the model is saved
@@ -791,6 +904,8 @@ read.ml <- function(path) {
     new("KMeansModel", jobj = jobj)
   } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.LDAWrapper")) {
     new("LDAModel", jobj = jobj)
+  } else if (isInstanceOf(jobj, 
"org.apache.spark.ml.r.MultilayerPerceptronClassifierWrapper")) {
+    new("MultilayerPerceptronClassificationModel", jobj = jobj)
   } else if (isInstanceOf(jobj, 
"org.apache.spark.ml.r.IsotonicRegressionWrapper")) {
     new("IsotonicRegressionModel", jobj = jobj)
   } else if (isInstanceOf(jobj, 
"org.apache.spark.ml.r.GaussianMixtureWrapper")) {
@@ -798,7 +913,7 @@ read.ml <- function(path) {
   } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.ALSWrapper")) {
     new("ALSModel", jobj = jobj)
   } else {
-    stop(paste("Unsupported model: ", jobj))
+    stop("Unsupported model: ", jobj)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2fbdb606/R/pkg/inst/tests/testthat/test_mllib.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_mllib.R 
b/R/pkg/inst/tests/testthat/test_mllib.R
index de9bd48..1e6da65 100644
--- a/R/pkg/inst/tests/testthat/test_mllib.R
+++ b/R/pkg/inst/tests/testthat/test_mllib.R
@@ -347,6 +347,38 @@ test_that("spark.kmeans", {
   unlink(modelPath)
 })
 
+test_that("spark.mlp", {
+  df <- read.df("data/mllib/sample_multiclass_classification_data.txt", source 
= "libsvm")
+  model <- spark.mlp(df, blockSize = 128, layers = c(4, 5, 4, 3), solver = 
"l-bfgs", maxIter = 100,
+                     tol = 0.5, stepSize = 1, seed = 1)
+
+  # Test summary method
+  summary <- summary(model)
+  expect_equal(summary$labelCount, 3)
+  expect_equal(summary$layers, c(4, 5, 4, 3))
+  expect_equal(length(summary$weights), 64)
+
+  # Test predict method
+  mlpTestDF <- df
+  mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction"))
+  expect_equal(head(mlpPredictions$prediction, 6), c(0, 1, 1, 1, 1, 1))
+
+  # Test model save/load
+  modelPath <- tempfile(pattern = "spark-mlp", fileext = ".tmp")
+  write.ml(model, modelPath)
+  expect_error(write.ml(model, modelPath))
+  write.ml(model, modelPath, overwrite = TRUE)
+  model2 <- read.ml(modelPath)
+  summary2 <- summary(model2)
+
+  expect_equal(summary2$labelCount, 3)
+  expect_equal(summary2$layers, c(4, 5, 4, 3))
+  expect_equal(length(summary2$weights), 64)
+
+  unlink(modelPath)
+
+})
+
 test_that("spark.naiveBayes", {
   # R code to reproduce the result.
   # We do not support instance weights yet. So we ignore the frequencies.

http://git-wip-us.apache.org/repos/asf/spark/blob/2fbdb606/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
new file mode 100644
index 0000000..be51e74
--- /dev/null
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.r
+
+import org.apache.hadoop.fs.Path
+import org.json4s._
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.ml.{Pipeline, PipelineModel}
+import 
org.apache.spark.ml.classification.{MultilayerPerceptronClassificationModel, 
MultilayerPerceptronClassifier}
+import org.apache.spark.ml.util.{MLReadable, MLReader, MLWritable, MLWriter}
+import org.apache.spark.sql.{DataFrame, Dataset}
+
+private[r] class MultilayerPerceptronClassifierWrapper private (
+    val pipeline: PipelineModel,
+    val labelCount: Long,
+    val layers: Array[Int],
+    val weights: Array[Double]
+  ) extends MLWritable {
+
+  def transform(dataset: Dataset[_]): DataFrame = {
+    pipeline.transform(dataset)
+  }
+
+  /**
+   * Returns an [[MLWriter]] instance for this ML instance.
+   */
+  override def write: MLWriter =
+    new 
MultilayerPerceptronClassifierWrapper.MultilayerPerceptronClassifierWrapperWriter(this)
+}
+
+private[r] object MultilayerPerceptronClassifierWrapper
+  extends MLReadable[MultilayerPerceptronClassifierWrapper] {
+
+  val PREDICTED_LABEL_COL = "prediction"
+
+  def fit(
+      data: DataFrame,
+      blockSize: Int,
+      layers: Array[Double],
+      solver: String,
+      maxIter: Int,
+      tol: Double,
+      stepSize: Double,
+      seed: Int
+     ): MultilayerPerceptronClassifierWrapper = {
+    // get labels and feature names from output schema
+    val schema = data.schema
+
+    // assemble and fit the pipeline
+    val mlp = new MultilayerPerceptronClassifier()
+      .setLayers(layers.map(_.toInt))
+      .setBlockSize(blockSize)
+      .setSolver(solver)
+      .setMaxIter(maxIter)
+      .setTol(tol)
+      .setStepSize(stepSize)
+      .setSeed(seed)
+      .setPredictionCol(PREDICTED_LABEL_COL)
+    val pipeline = new Pipeline()
+      .setStages(Array(mlp))
+      .fit(data)
+
+    val multilayerPerceptronClassificationModel: 
MultilayerPerceptronClassificationModel =
+    pipeline.stages.head.asInstanceOf[MultilayerPerceptronClassificationModel]
+
+    val weights = multilayerPerceptronClassificationModel.weights.toArray
+    val layersFromPipeline = multilayerPerceptronClassificationModel.layers
+    val labelCount = data.select("label").distinct().count()
+
+    new MultilayerPerceptronClassifierWrapper(pipeline, labelCount, 
layersFromPipeline, weights)
+  }
+
+  /**
+   * Returns an [[MLReader]] instance for this class.
+   */
+  override def read: MLReader[MultilayerPerceptronClassifierWrapper] =
+    new MultilayerPerceptronClassifierWrapperReader
+
+  override def load(path: String): MultilayerPerceptronClassifierWrapper = 
super.load(path)
+
+  class MultilayerPerceptronClassifierWrapperReader
+    extends MLReader[MultilayerPerceptronClassifierWrapper]{
+
+    override def load(path: String): MultilayerPerceptronClassifierWrapper = {
+      implicit val format = DefaultFormats
+      val rMetadataPath = new Path(path, "rMetadata").toString
+      val pipelinePath = new Path(path, "pipeline").toString
+
+      val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
+      val rMetadata = parse(rMetadataStr)
+      val labelCount = (rMetadata \ "labelCount").extract[Long]
+      val layers = (rMetadata \ "layers").extract[Array[Int]]
+      val weights = (rMetadata \ "weights").extract[Array[Double]]
+
+      val pipeline = PipelineModel.load(pipelinePath)
+      new MultilayerPerceptronClassifierWrapper(pipeline, labelCount, layers, 
weights)
+    }
+  }
+
+  class MultilayerPerceptronClassifierWrapperWriter(instance: 
MultilayerPerceptronClassifierWrapper)
+    extends MLWriter {
+
+    override protected def saveImpl(path: String): Unit = {
+      val rMetadataPath = new Path(path, "rMetadata").toString
+      val pipelinePath = new Path(path, "pipeline").toString
+
+      val rMetadata = ("class" -> instance.getClass.getName) ~
+        ("labelCount" -> instance.labelCount) ~
+        ("layers" -> instance.layers.toSeq) ~
+        ("weights" -> instance.weights.toArray.toSeq)
+      val rMetadataJson: String = compact(render(rMetadata))
+      sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)
+
+      instance.pipeline.save(pipelinePath)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/2fbdb606/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala 
b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
index 51a65f7..d64de1b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala
@@ -44,6 +44,8 @@ private[r] object RWrappers extends MLReader[Object] {
         GeneralizedLinearRegressionWrapper.load(path)
       case "org.apache.spark.ml.r.KMeansWrapper" =>
         KMeansWrapper.load(path)
+      case "org.apache.spark.ml.r.MultilayerPerceptronClassifierWrapper" =>
+        MultilayerPerceptronClassifierWrapper.load(path)
       case "org.apache.spark.ml.r.LDAWrapper" =>
         LDAWrapper.load(path)
       case "org.apache.spark.ml.r.IsotonicRegressionWrapper" =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to