Repository: spark Updated Branches: refs/heads/master d2932a0e9 -> 2fbdb6063
[SPARK-16445][MLLIB][SPARKR] Multilayer Perceptron Classifier wrapper in SparkR https://issues.apache.org/jira/browse/SPARK-16445 ## What changes were proposed in this pull request? Create Multilayer Perceptron Classifier wrapper in SparkR ## How was this patch tested? Tested manually on local machine Author: Xin Ren <iamsh...@126.com> Closes #14447 from keypointt/SPARK-16445. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2fbdb606 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2fbdb606 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2fbdb606 Branch: refs/heads/master Commit: 2fbdb606392631b1dff88ec86f388cc2559c28f5 Parents: d2932a0 Author: Xin Ren <iamsh...@126.com> Authored: Wed Aug 24 11:18:10 2016 -0700 Committer: Felix Cheung <felixche...@apache.org> Committed: Wed Aug 24 11:18:10 2016 -0700 ---------------------------------------------------------------------- R/pkg/NAMESPACE | 1 + R/pkg/R/generics.R | 4 + R/pkg/R/mllib.R | 125 ++++++++++++++++- R/pkg/inst/tests/testthat/test_mllib.R | 32 +++++ .../MultilayerPerceptronClassifierWrapper.scala | 134 +++++++++++++++++++ .../scala/org/apache/spark/ml/r/RWrappers.scala | 2 + 6 files changed, 293 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2fbdb606/R/pkg/NAMESPACE ---------------------------------------------------------------------- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 7090576..ad587a6 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -27,6 +27,7 @@ exportMethods("glm", "summary", "spark.kmeans", "fitted", + "spark.mlp", "spark.naiveBayes", "spark.survreg", "spark.lda", http://git-wip-us.apache.org/repos/asf/spark/blob/2fbdb606/R/pkg/R/generics.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 88884e6..7e626be 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1330,6 +1330,10 @@ setGeneric("spark.kmeans", function(data, formula, ...) { standardGeneric("spark #' @export setGeneric("fitted") +#' @rdname spark.mlp +#' @export +setGeneric("spark.mlp", function(data, ...) { standardGeneric("spark.mlp") }) + #' @rdname spark.naiveBayes #' @export setGeneric("spark.naiveBayes", function(data, formula, ...) { standardGeneric("spark.naiveBayes") }) http://git-wip-us.apache.org/repos/asf/spark/blob/2fbdb606/R/pkg/R/mllib.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index a40310d..a670600 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -60,6 +60,13 @@ setClass("AFTSurvivalRegressionModel", representation(jobj = "jobj")) #' @note KMeansModel since 2.0.0 setClass("KMeansModel", representation(jobj = "jobj")) +#' S4 class that represents a MultilayerPerceptronClassificationModel +#' +#' @param jobj a Java object reference to the backing Scala MultilayerPerceptronClassifierWrapper +#' @export +#' @note MultilayerPerceptronClassificationModel since 2.1.0 +setClass("MultilayerPerceptronClassificationModel", representation(jobj = "jobj")) + #' S4 class that represents an IsotonicRegressionModel #' #' @param jobj a Java object reference to the backing Scala IsotonicRegressionModel @@ -90,7 +97,7 @@ setClass("ALSModel", representation(jobj = "jobj")) #' @export #' @seealso \link{spark.glm}, \link{glm}, #' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.isoreg}, \link{spark.kmeans}, -#' @seealso \link{spark.lda}, \link{spark.naiveBayes}, \link{spark.survreg}, +#' @seealso \link{spark.lda}, \link{spark.mlp}, \link{spark.naiveBayes}, \link{spark.survreg} #' @seealso \link{read.ml} NULL @@ -103,7 +110,7 @@ NULL #' @export #' @seealso \link{spark.glm}, \link{glm}, #' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.isoreg}, \link{spark.kmeans}, -#' @seealso \link{spark.naiveBayes}, \link{spark.survreg}, +#' @seealso \link{spark.mlp}, \link{spark.naiveBayes}, \link{spark.survreg} NULL write_internal <- function(object, path, overwrite = FALSE) { @@ -631,6 +638,95 @@ setMethod("predict", signature(object = "KMeansModel"), predict_internal(object, newData) }) +#' Multilayer Perceptron Classification Model +#' +#' \code{spark.mlp} fits a multi-layer perceptron neural network model against a SparkDataFrame. +#' Users can call \code{summary} to print a summary of the fitted model, \code{predict} to make +#' predictions on new data, and \code{write.ml}/\code{read.ml} to save/load fitted models. +#' Only categorical data is supported. +#' For more details, see +#' \href{http://spark.apache.org/docs/latest/ml-classification-regression.html}{ +#' Multilayer Perceptron} +#' +#' @param data a \code{SparkDataFrame} of observations and labels for model fitting. +#' @param blockSize blockSize parameter. +#' @param layers integer vector containing the number of nodes for each layer +#' @param solver solver parameter, supported options: "gd" (minibatch gradient descent) or "l-bfgs". +#' @param maxIter maximum iteration number. +#' @param tol convergence tolerance of iterations. +#' @param stepSize stepSize parameter. +#' @param seed seed parameter for weights initialization. +#' @param ... additional arguments passed to the method. +#' @return \code{spark.mlp} returns a fitted Multilayer Perceptron Classification Model. +#' @rdname spark.mlp +#' @aliases spark.mlp,SparkDataFrame-method +#' @name spark.mlp +#' @seealso \link{read.ml} +#' @export +#' @examples +#' \dontrun{ +#' df <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm") +#' +#' # fit a Multilayer Perceptron Classification Model +#' model <- spark.mlp(df, blockSize = 128, layers = c(4, 5, 4, 3), solver = "l-bfgs", +#' maxIter = 100, tol = 0.5, stepSize = 1, seed = 1) +#' +#' # get the summary of the model +#' summary(model) +#' +#' # make predictions +#' predictions <- predict(model, df) +#' +#' # save and load the model +#' path <- "path/to/model" +#' write.ml(model, path) +#' savedModel <- read.ml(path) +#' summary(savedModel) +#' } +#' @note spark.mlp since 2.1.0 +setMethod("spark.mlp", signature(data = "SparkDataFrame"), + function(data, blockSize = 128, layers = c(3, 5, 2), solver = "l-bfgs", maxIter = 100, + tol = 0.5, stepSize = 1, seed = 1) { + jobj <- callJStatic("org.apache.spark.ml.r.MultilayerPerceptronClassifierWrapper", + "fit", data@sdf, as.integer(blockSize), as.array(layers), + as.character(solver), as.integer(maxIter), as.numeric(tol), + as.numeric(stepSize), as.integer(seed)) + new("MultilayerPerceptronClassificationModel", jobj = jobj) + }) + +# Makes predictions from a model produced by spark.mlp(). + +#' @param newData a SparkDataFrame for testing. +#' @return \code{predict} returns a SparkDataFrame containing predicted labeled in a column named +#' "prediction". +#' @rdname spark.mlp +#' @aliases predict,MultilayerPerceptronClassificationModel-method +#' @export +#' @note predict(MultilayerPerceptronClassificationModel) since 2.1.0 +setMethod("predict", signature(object = "MultilayerPerceptronClassificationModel"), + function(object, newData) { + predict_internal(object, newData) + }) + +# Returns the summary of a Multilayer Perceptron Classification Model produced by \code{spark.mlp} + +#' @param object a Multilayer Perceptron Classification Model fitted by \code{spark.mlp} +#' @return \code{summary} returns a list containing \code{layers}, the label distribution, and +#' \code{tables}, conditional probabilities given the target label. +#' @rdname spark.mlp +#' @export +#' @aliases summary,MultilayerPerceptronClassificationModel-method +#' @note summary(MultilayerPerceptronClassificationModel) since 2.1.0 +setMethod("summary", signature(object = "MultilayerPerceptronClassificationModel"), + function(object) { + jobj <- object@jobj + labelCount <- callJMethod(jobj, "labelCount") + layers <- unlist(callJMethod(jobj, "layers")) + weights <- callJMethod(jobj, "weights") + weights <- matrix(weights, nrow = length(weights)) + list(labelCount = labelCount, layers = layers, weights = weights) + }) + #' Naive Bayes Models #' #' \code{spark.naiveBayes} fits a Bernoulli naive Bayes model against a SparkDataFrame. @@ -685,7 +781,7 @@ setMethod("spark.naiveBayes", signature(data = "SparkDataFrame", formula = "form #' #' @rdname spark.naiveBayes #' @export -#' @seealso \link{read.ml} +#' @seealso \link{write.ml} #' @note write.ml(NaiveBayesModel, character) since 2.0.0 setMethod("write.ml", signature(object = "NaiveBayesModel", path = "character"), function(object, path, overwrite = FALSE) { @@ -700,7 +796,7 @@ setMethod("write.ml", signature(object = "NaiveBayesModel", path = "character"), #' @rdname spark.survreg #' @export #' @note write.ml(AFTSurvivalRegressionModel, character) since 2.0.0 -#' @seealso \link{read.ml} +#' @seealso \link{write.ml} setMethod("write.ml", signature(object = "AFTSurvivalRegressionModel", path = "character"), function(object, path, overwrite = FALSE) { write_internal(object, path, overwrite) @@ -734,6 +830,23 @@ setMethod("write.ml", signature(object = "KMeansModel", path = "character"), write_internal(object, path, overwrite) }) +# Saves the Multilayer Perceptron Classification Model to the input path. + +#' @param path the directory where the model is saved. +#' @param overwrite overwrites or not if the output path already exists. Default is FALSE +#' which means throw exception if the output path exists. +#' +#' @rdname spark.mlp +#' @aliases write.ml,MultilayerPerceptronClassificationModel,character-method +#' @export +#' @seealso \link{write.ml} +#' @note write.ml(MultilayerPerceptronClassificationModel, character) since 2.1.0 +setMethod("write.ml", signature(object = "MultilayerPerceptronClassificationModel", + path = "character"), + function(object, path, overwrite = FALSE) { + write_internal(object, path, overwrite) + }) + # Save fitted IsotonicRegressionModel to the input path #' @param path The directory where the model is saved @@ -791,6 +904,8 @@ read.ml <- function(path) { new("KMeansModel", jobj = jobj) } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.LDAWrapper")) { new("LDAModel", jobj = jobj) + } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.MultilayerPerceptronClassifierWrapper")) { + new("MultilayerPerceptronClassificationModel", jobj = jobj) } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.IsotonicRegressionWrapper")) { new("IsotonicRegressionModel", jobj = jobj) } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.GaussianMixtureWrapper")) { @@ -798,7 +913,7 @@ read.ml <- function(path) { } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.ALSWrapper")) { new("ALSModel", jobj = jobj) } else { - stop(paste("Unsupported model: ", jobj)) + stop("Unsupported model: ", jobj) } } http://git-wip-us.apache.org/repos/asf/spark/blob/2fbdb606/R/pkg/inst/tests/testthat/test_mllib.R ---------------------------------------------------------------------- diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index de9bd48..1e6da65 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -347,6 +347,38 @@ test_that("spark.kmeans", { unlink(modelPath) }) +test_that("spark.mlp", { + df <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm") + model <- spark.mlp(df, blockSize = 128, layers = c(4, 5, 4, 3), solver = "l-bfgs", maxIter = 100, + tol = 0.5, stepSize = 1, seed = 1) + + # Test summary method + summary <- summary(model) + expect_equal(summary$labelCount, 3) + expect_equal(summary$layers, c(4, 5, 4, 3)) + expect_equal(length(summary$weights), 64) + + # Test predict method + mlpTestDF <- df + mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) + expect_equal(head(mlpPredictions$prediction, 6), c(0, 1, 1, 1, 1, 1)) + + # Test model save/load + modelPath <- tempfile(pattern = "spark-mlp", fileext = ".tmp") + write.ml(model, modelPath) + expect_error(write.ml(model, modelPath)) + write.ml(model, modelPath, overwrite = TRUE) + model2 <- read.ml(modelPath) + summary2 <- summary(model2) + + expect_equal(summary2$labelCount, 3) + expect_equal(summary2$layers, c(4, 5, 4, 3)) + expect_equal(length(summary2$weights), 64) + + unlink(modelPath) + +}) + test_that("spark.naiveBayes", { # R code to reproduce the result. # We do not support instance weights yet. So we ignore the frequencies. http://git-wip-us.apache.org/repos/asf/spark/blob/2fbdb606/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala new file mode 100644 index 0000000..be51e74 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/r/MultilayerPerceptronClassifierWrapper.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.r + +import org.apache.hadoop.fs.Path +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.ml.{Pipeline, PipelineModel} +import org.apache.spark.ml.classification.{MultilayerPerceptronClassificationModel, MultilayerPerceptronClassifier} +import org.apache.spark.ml.util.{MLReadable, MLReader, MLWritable, MLWriter} +import org.apache.spark.sql.{DataFrame, Dataset} + +private[r] class MultilayerPerceptronClassifierWrapper private ( + val pipeline: PipelineModel, + val labelCount: Long, + val layers: Array[Int], + val weights: Array[Double] + ) extends MLWritable { + + def transform(dataset: Dataset[_]): DataFrame = { + pipeline.transform(dataset) + } + + /** + * Returns an [[MLWriter]] instance for this ML instance. + */ + override def write: MLWriter = + new MultilayerPerceptronClassifierWrapper.MultilayerPerceptronClassifierWrapperWriter(this) +} + +private[r] object MultilayerPerceptronClassifierWrapper + extends MLReadable[MultilayerPerceptronClassifierWrapper] { + + val PREDICTED_LABEL_COL = "prediction" + + def fit( + data: DataFrame, + blockSize: Int, + layers: Array[Double], + solver: String, + maxIter: Int, + tol: Double, + stepSize: Double, + seed: Int + ): MultilayerPerceptronClassifierWrapper = { + // get labels and feature names from output schema + val schema = data.schema + + // assemble and fit the pipeline + val mlp = new MultilayerPerceptronClassifier() + .setLayers(layers.map(_.toInt)) + .setBlockSize(blockSize) + .setSolver(solver) + .setMaxIter(maxIter) + .setTol(tol) + .setStepSize(stepSize) + .setSeed(seed) + .setPredictionCol(PREDICTED_LABEL_COL) + val pipeline = new Pipeline() + .setStages(Array(mlp)) + .fit(data) + + val multilayerPerceptronClassificationModel: MultilayerPerceptronClassificationModel = + pipeline.stages.head.asInstanceOf[MultilayerPerceptronClassificationModel] + + val weights = multilayerPerceptronClassificationModel.weights.toArray + val layersFromPipeline = multilayerPerceptronClassificationModel.layers + val labelCount = data.select("label").distinct().count() + + new MultilayerPerceptronClassifierWrapper(pipeline, labelCount, layersFromPipeline, weights) + } + + /** + * Returns an [[MLReader]] instance for this class. + */ + override def read: MLReader[MultilayerPerceptronClassifierWrapper] = + new MultilayerPerceptronClassifierWrapperReader + + override def load(path: String): MultilayerPerceptronClassifierWrapper = super.load(path) + + class MultilayerPerceptronClassifierWrapperReader + extends MLReader[MultilayerPerceptronClassifierWrapper]{ + + override def load(path: String): MultilayerPerceptronClassifierWrapper = { + implicit val format = DefaultFormats + val rMetadataPath = new Path(path, "rMetadata").toString + val pipelinePath = new Path(path, "pipeline").toString + + val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadata = parse(rMetadataStr) + val labelCount = (rMetadata \ "labelCount").extract[Long] + val layers = (rMetadata \ "layers").extract[Array[Int]] + val weights = (rMetadata \ "weights").extract[Array[Double]] + + val pipeline = PipelineModel.load(pipelinePath) + new MultilayerPerceptronClassifierWrapper(pipeline, labelCount, layers, weights) + } + } + + class MultilayerPerceptronClassifierWrapperWriter(instance: MultilayerPerceptronClassifierWrapper) + extends MLWriter { + + override protected def saveImpl(path: String): Unit = { + val rMetadataPath = new Path(path, "rMetadata").toString + val pipelinePath = new Path(path, "pipeline").toString + + val rMetadata = ("class" -> instance.getClass.getName) ~ + ("labelCount" -> instance.labelCount) ~ + ("layers" -> instance.layers.toSeq) ~ + ("weights" -> instance.weights.toArray.toSeq) + val rMetadataJson: String = compact(render(rMetadata)) + sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + + instance.pipeline.save(pipelinePath) + } + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/2fbdb606/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala index 51a65f7..d64de1b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala @@ -44,6 +44,8 @@ private[r] object RWrappers extends MLReader[Object] { GeneralizedLinearRegressionWrapper.load(path) case "org.apache.spark.ml.r.KMeansWrapper" => KMeansWrapper.load(path) + case "org.apache.spark.ml.r.MultilayerPerceptronClassifierWrapper" => + MultilayerPerceptronClassifierWrapper.load(path) case "org.apache.spark.ml.r.LDAWrapper" => LDAWrapper.load(path) case "org.apache.spark.ml.r.IsotonicRegressionWrapper" => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org