http://git-wip-us.apache.org/repos/asf/spark/blob/6b6b555a/R/pkg/R/mllib_tree.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/mllib_tree.R b/R/pkg/R/mllib_tree.R new file mode 100644 index 0000000..0d53fad --- /dev/null +++ b/R/pkg/R/mllib_tree.R @@ -0,0 +1,496 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# mllib_tree.R: Provides methods for MLlib tree-based algorithms integration + +#' S4 class that represents a GBTRegressionModel +#' +#' @param jobj a Java object reference to the backing Scala GBTRegressionModel +#' @export +#' @note GBTRegressionModel since 2.1.0 +setClass("GBTRegressionModel", representation(jobj = "jobj")) + +#' S4 class that represents a GBTClassificationModel +#' +#' @param jobj a Java object reference to the backing Scala GBTClassificationModel +#' @export +#' @note GBTClassificationModel since 2.1.0 +setClass("GBTClassificationModel", representation(jobj = "jobj")) + +#' S4 class that represents a RandomForestRegressionModel +#' +#' @param jobj a Java object reference to the backing Scala RandomForestRegressionModel +#' @export +#' @note RandomForestRegressionModel since 2.1.0 +setClass("RandomForestRegressionModel", representation(jobj = "jobj")) + +#' S4 class that represents a RandomForestClassificationModel +#' +#' @param jobj a Java object reference to the backing Scala RandomForestClassificationModel +#' @export +#' @note RandomForestClassificationModel since 2.1.0 +setClass("RandomForestClassificationModel", representation(jobj = "jobj")) + +# Create the summary of a tree ensemble model (eg. Random Forest, GBT) +summary.treeEnsemble <- function(model) { + jobj <- model@jobj + formula <- callJMethod(jobj, "formula") + numFeatures <- callJMethod(jobj, "numFeatures") + features <- callJMethod(jobj, "features") + featureImportances <- callJMethod(callJMethod(jobj, "featureImportances"), "toString") + numTrees <- callJMethod(jobj, "numTrees") + treeWeights <- callJMethod(jobj, "treeWeights") + list(formula = formula, + numFeatures = numFeatures, + features = features, + featureImportances = featureImportances, + numTrees = numTrees, + treeWeights = treeWeights, + jobj = jobj) +} + +# Prints the summary of tree ensemble models (eg. Random Forest, GBT) +print.summary.treeEnsemble <- function(x) { + jobj <- x$jobj + cat("Formula: ", x$formula) + cat("\nNumber of features: ", x$numFeatures) + cat("\nFeatures: ", unlist(x$features)) + cat("\nFeature importances: ", x$featureImportances) + cat("\nNumber of trees: ", x$numTrees) + cat("\nTree weights: ", unlist(x$treeWeights)) + + summaryStr <- callJMethod(jobj, "summary") + cat("\n", summaryStr, "\n") + invisible(x) +} + +#' Gradient Boosted Tree Model for Regression and Classification +#' +#' \code{spark.gbt} fits a Gradient Boosted Tree Regression model or Classification model on a +#' SparkDataFrame. Users can call \code{summary} to get a summary of the fitted +#' Gradient Boosted Tree model, \code{predict} to make predictions on new data, and +#' \code{write.ml}/\code{read.ml} to save/load fitted models. +#' For more details, see +#' \href{http://spark.apache.org/docs/latest/ml-classification-regression.html#gradient-boosted-tree-regression}{ +#' GBT Regression} and +#' \href{http://spark.apache.org/docs/latest/ml-classification-regression.html#gradient-boosted-tree-classifier}{ +#' GBT Classification} +#' +#' @param data a SparkDataFrame for training. +#' @param formula a symbolic description of the model to be fitted. Currently only a few formula +#' operators are supported, including '~', ':', '+', and '-'. +#' @param type type of model, one of "regression" or "classification", to fit +#' @param maxDepth Maximum depth of the tree (>= 0). +#' @param maxBins Maximum number of bins used for discretizing continuous features and for choosing +#' how to split on features at each node. More bins give higher granularity. Must be +#' >= 2 and >= number of categories in any categorical feature. +#' @param maxIter Param for maximum number of iterations (>= 0). +#' @param stepSize Param for Step size to be used for each iteration of optimization. +#' @param lossType Loss function which GBT tries to minimize. +#' For classification, must be "logistic". For regression, must be one of +#' "squared" (L2) and "absolute" (L1), default is "squared". +#' @param seed integer seed for random number generation. +#' @param subsamplingRate Fraction of the training data used for learning each decision tree, in +#' range (0, 1]. +#' @param minInstancesPerNode Minimum number of instances each child must have after split. If a +#' split causes the left or right child to have fewer than +#' minInstancesPerNode, the split will be discarded as invalid. Should be +#' >= 1. +#' @param minInfoGain Minimum information gain for a split to be considered at a tree node. +#' @param checkpointInterval Param for set checkpoint interval (>= 1) or disable checkpoint (-1). +#' @param maxMemoryInMB Maximum memory in MB allocated to histogram aggregation. +#' @param cacheNodeIds If FALSE, the algorithm will pass trees to executors to match instances with +#' nodes. If TRUE, the algorithm will cache node IDs for each instance. Caching +#' can speed up training of deeper trees. Users can set how often should the +#' cache be checkpointed or disable it by setting checkpointInterval. +#' @param ... additional arguments passed to the method. +#' @aliases spark.gbt,SparkDataFrame,formula-method +#' @return \code{spark.gbt} returns a fitted Gradient Boosted Tree model. +#' @rdname spark.gbt +#' @name spark.gbt +#' @export +#' @examples +#' \dontrun{ +#' # fit a Gradient Boosted Tree Regression Model +#' df <- createDataFrame(longley) +#' model <- spark.gbt(df, Employed ~ ., type = "regression", maxDepth = 5, maxBins = 16) +#' +#' # get the summary of the model +#' summary(model) +#' +#' # make predictions +#' predictions <- predict(model, df) +#' +#' # save and load the model +#' path <- "path/to/model" +#' write.ml(model, path) +#' savedModel <- read.ml(path) +#' summary(savedModel) +#' +#' # fit a Gradient Boosted Tree Classification Model +#' # label must be binary - Only binary classification is supported for GBT. +#' df <- createDataFrame(iris[iris$Species != "virginica", ]) +#' model <- spark.gbt(df, Species ~ Petal_Length + Petal_Width, "classification") +#' +#' # numeric label is also supported +#' iris2 <- iris[iris$Species != "virginica", ] +#' iris2$NumericSpecies <- ifelse(iris2$Species == "setosa", 0, 1) +#' df <- createDataFrame(iris2) +#' model <- spark.gbt(df, NumericSpecies ~ ., type = "classification") +#' } +#' @note spark.gbt since 2.1.0 +setMethod("spark.gbt", signature(data = "SparkDataFrame", formula = "formula"), + function(data, formula, type = c("regression", "classification"), + maxDepth = 5, maxBins = 32, maxIter = 20, stepSize = 0.1, lossType = NULL, + seed = NULL, subsamplingRate = 1.0, minInstancesPerNode = 1, minInfoGain = 0.0, + checkpointInterval = 10, maxMemoryInMB = 256, cacheNodeIds = FALSE) { + type <- match.arg(type) + formula <- paste(deparse(formula), collapse = "") + if (!is.null(seed)) { + seed <- as.character(as.integer(seed)) + } + switch(type, + regression = { + if (is.null(lossType)) lossType <- "squared" + lossType <- match.arg(lossType, c("squared", "absolute")) + jobj <- callJStatic("org.apache.spark.ml.r.GBTRegressorWrapper", + "fit", data@sdf, formula, as.integer(maxDepth), + as.integer(maxBins), as.integer(maxIter), + as.numeric(stepSize), as.integer(minInstancesPerNode), + as.numeric(minInfoGain), as.integer(checkpointInterval), + lossType, seed, as.numeric(subsamplingRate), + as.integer(maxMemoryInMB), as.logical(cacheNodeIds)) + new("GBTRegressionModel", jobj = jobj) + }, + classification = { + if (is.null(lossType)) lossType <- "logistic" + lossType <- match.arg(lossType, "logistic") + jobj <- callJStatic("org.apache.spark.ml.r.GBTClassifierWrapper", + "fit", data@sdf, formula, as.integer(maxDepth), + as.integer(maxBins), as.integer(maxIter), + as.numeric(stepSize), as.integer(minInstancesPerNode), + as.numeric(minInfoGain), as.integer(checkpointInterval), + lossType, seed, as.numeric(subsamplingRate), + as.integer(maxMemoryInMB), as.logical(cacheNodeIds)) + new("GBTClassificationModel", jobj = jobj) + } + ) + }) + +# Get the summary of a Gradient Boosted Tree Regression Model + +#' @return \code{summary} returns summary information of the fitted model, which is a list. +#' The list of components includes \code{formula} (formula), +#' \code{numFeatures} (number of features), \code{features} (list of features), +#' \code{featureImportances} (feature importances), \code{numTrees} (number of trees), +#' and \code{treeWeights} (tree weights). +#' @rdname spark.gbt +#' @aliases summary,GBTRegressionModel-method +#' @export +#' @note summary(GBTRegressionModel) since 2.1.0 +setMethod("summary", signature(object = "GBTRegressionModel"), + function(object) { + ans <- summary.treeEnsemble(object) + class(ans) <- "summary.GBTRegressionModel" + ans + }) + +# Prints the summary of Gradient Boosted Tree Regression Model + +#' @param x summary object of Gradient Boosted Tree regression model or classification model +#' returned by \code{summary}. +#' @rdname spark.gbt +#' @export +#' @note print.summary.GBTRegressionModel since 2.1.0 +print.summary.GBTRegressionModel <- function(x, ...) { + print.summary.treeEnsemble(x) +} + +# Get the summary of a Gradient Boosted Tree Classification Model + +#' @rdname spark.gbt +#' @aliases summary,GBTClassificationModel-method +#' @export +#' @note summary(GBTClassificationModel) since 2.1.0 +setMethod("summary", signature(object = "GBTClassificationModel"), + function(object) { + ans <- summary.treeEnsemble(object) + class(ans) <- "summary.GBTClassificationModel" + ans + }) + +# Prints the summary of Gradient Boosted Tree Classification Model + +#' @rdname spark.gbt +#' @export +#' @note print.summary.GBTClassificationModel since 2.1.0 +print.summary.GBTClassificationModel <- function(x, ...) { + print.summary.treeEnsemble(x) +} + +# Makes predictions from a Gradient Boosted Tree Regression model or Classification model + +#' @param newData a SparkDataFrame for testing. +#' @return \code{predict} returns a SparkDataFrame containing predicted labeled in a column named +#' "prediction". +#' @rdname spark.gbt +#' @aliases predict,GBTRegressionModel-method +#' @export +#' @note predict(GBTRegressionModel) since 2.1.0 +setMethod("predict", signature(object = "GBTRegressionModel"), + function(object, newData) { + predict_internal(object, newData) + }) + +#' @rdname spark.gbt +#' @aliases predict,GBTClassificationModel-method +#' @export +#' @note predict(GBTClassificationModel) since 2.1.0 +setMethod("predict", signature(object = "GBTClassificationModel"), + function(object, newData) { + predict_internal(object, newData) + }) + +# Save the Gradient Boosted Tree Regression or Classification model to the input path. + +#' @param object A fitted Gradient Boosted Tree regression model or classification model. +#' @param path The directory where the model is saved. +#' @param overwrite Overwrites or not if the output path already exists. Default is FALSE +#' which means throw exception if the output path exists. +#' @aliases write.ml,GBTRegressionModel,character-method +#' @rdname spark.gbt +#' @export +#' @note write.ml(GBTRegressionModel, character) since 2.1.0 +setMethod("write.ml", signature(object = "GBTRegressionModel", path = "character"), + function(object, path, overwrite = FALSE) { + write_internal(object, path, overwrite) + }) + +#' @aliases write.ml,GBTClassificationModel,character-method +#' @rdname spark.gbt +#' @export +#' @note write.ml(GBTClassificationModel, character) since 2.1.0 +setMethod("write.ml", signature(object = "GBTClassificationModel", path = "character"), + function(object, path, overwrite = FALSE) { + write_internal(object, path, overwrite) + }) + +#' Random Forest Model for Regression and Classification +#' +#' \code{spark.randomForest} fits a Random Forest Regression model or Classification model on +#' a SparkDataFrame. Users can call \code{summary} to get a summary of the fitted Random Forest +#' model, \code{predict} to make predictions on new data, and \code{write.ml}/\code{read.ml} to +#' save/load fitted models. +#' For more details, see +#' \href{http://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-regression}{ +#' Random Forest Regression} and +#' \href{http://spark.apache.org/docs/latest/ml-classification-regression.html#random-forest-classifier}{ +#' Random Forest Classification} +#' +#' @param data a SparkDataFrame for training. +#' @param formula a symbolic description of the model to be fitted. Currently only a few formula +#' operators are supported, including '~', ':', '+', and '-'. +#' @param type type of model, one of "regression" or "classification", to fit +#' @param maxDepth Maximum depth of the tree (>= 0). +#' @param maxBins Maximum number of bins used for discretizing continuous features and for choosing +#' how to split on features at each node. More bins give higher granularity. Must be +#' >= 2 and >= number of categories in any categorical feature. +#' @param numTrees Number of trees to train (>= 1). +#' @param impurity Criterion used for information gain calculation. +#' For regression, must be "variance". For classification, must be one of +#' "entropy" and "gini", default is "gini". +#' @param featureSubsetStrategy The number of features to consider for splits at each tree node. +#' Supported options: "auto", "all", "onethird", "sqrt", "log2", (0.0-1.0], [1-n]. +#' @param seed integer seed for random number generation. +#' @param subsamplingRate Fraction of the training data used for learning each decision tree, in +#' range (0, 1]. +#' @param minInstancesPerNode Minimum number of instances each child must have after split. +#' @param minInfoGain Minimum information gain for a split to be considered at a tree node. +#' @param checkpointInterval Param for set checkpoint interval (>= 1) or disable checkpoint (-1). +#' @param maxMemoryInMB Maximum memory in MB allocated to histogram aggregation. +#' @param cacheNodeIds If FALSE, the algorithm will pass trees to executors to match instances with +#' nodes. If TRUE, the algorithm will cache node IDs for each instance. Caching +#' can speed up training of deeper trees. Users can set how often should the +#' cache be checkpointed or disable it by setting checkpointInterval. +#' @param ... additional arguments passed to the method. +#' @aliases spark.randomForest,SparkDataFrame,formula-method +#' @return \code{spark.randomForest} returns a fitted Random Forest model. +#' @rdname spark.randomForest +#' @name spark.randomForest +#' @export +#' @examples +#' \dontrun{ +#' # fit a Random Forest Regression Model +#' df <- createDataFrame(longley) +#' model <- spark.randomForest(df, Employed ~ ., type = "regression", maxDepth = 5, maxBins = 16) +#' +#' # get the summary of the model +#' summary(model) +#' +#' # make predictions +#' predictions <- predict(model, df) +#' +#' # save and load the model +#' path <- "path/to/model" +#' write.ml(model, path) +#' savedModel <- read.ml(path) +#' summary(savedModel) +#' +#' # fit a Random Forest Classification Model +#' df <- createDataFrame(iris) +#' model <- spark.randomForest(df, Species ~ Petal_Length + Petal_Width, "classification") +#' } +#' @note spark.randomForest since 2.1.0 +setMethod("spark.randomForest", signature(data = "SparkDataFrame", formula = "formula"), + function(data, formula, type = c("regression", "classification"), + maxDepth = 5, maxBins = 32, numTrees = 20, impurity = NULL, + featureSubsetStrategy = "auto", seed = NULL, subsamplingRate = 1.0, + minInstancesPerNode = 1, minInfoGain = 0.0, checkpointInterval = 10, + maxMemoryInMB = 256, cacheNodeIds = FALSE) { + type <- match.arg(type) + formula <- paste(deparse(formula), collapse = "") + if (!is.null(seed)) { + seed <- as.character(as.integer(seed)) + } + switch(type, + regression = { + if (is.null(impurity)) impurity <- "variance" + impurity <- match.arg(impurity, "variance") + jobj <- callJStatic("org.apache.spark.ml.r.RandomForestRegressorWrapper", + "fit", data@sdf, formula, as.integer(maxDepth), + as.integer(maxBins), as.integer(numTrees), + impurity, as.integer(minInstancesPerNode), + as.numeric(minInfoGain), as.integer(checkpointInterval), + as.character(featureSubsetStrategy), seed, + as.numeric(subsamplingRate), + as.integer(maxMemoryInMB), as.logical(cacheNodeIds)) + new("RandomForestRegressionModel", jobj = jobj) + }, + classification = { + if (is.null(impurity)) impurity <- "gini" + impurity <- match.arg(impurity, c("gini", "entropy")) + jobj <- callJStatic("org.apache.spark.ml.r.RandomForestClassifierWrapper", + "fit", data@sdf, formula, as.integer(maxDepth), + as.integer(maxBins), as.integer(numTrees), + impurity, as.integer(minInstancesPerNode), + as.numeric(minInfoGain), as.integer(checkpointInterval), + as.character(featureSubsetStrategy), seed, + as.numeric(subsamplingRate), + as.integer(maxMemoryInMB), as.logical(cacheNodeIds)) + new("RandomForestClassificationModel", jobj = jobj) + } + ) + }) + +# Get the summary of a Random Forest Regression Model + +#' @return \code{summary} returns summary information of the fitted model, which is a list. +#' The list of components includes \code{formula} (formula), +#' \code{numFeatures} (number of features), \code{features} (list of features), +#' \code{featureImportances} (feature importances), \code{numTrees} (number of trees), +#' and \code{treeWeights} (tree weights). +#' @rdname spark.randomForest +#' @aliases summary,RandomForestRegressionModel-method +#' @export +#' @note summary(RandomForestRegressionModel) since 2.1.0 +setMethod("summary", signature(object = "RandomForestRegressionModel"), + function(object) { + ans <- summary.treeEnsemble(object) + class(ans) <- "summary.RandomForestRegressionModel" + ans + }) + +# Prints the summary of Random Forest Regression Model + +#' @param x summary object of Random Forest regression model or classification model +#' returned by \code{summary}. +#' @rdname spark.randomForest +#' @export +#' @note print.summary.RandomForestRegressionModel since 2.1.0 +print.summary.RandomForestRegressionModel <- function(x, ...) { + print.summary.treeEnsemble(x) +} + +# Get the summary of a Random Forest Classification Model + +#' @rdname spark.randomForest +#' @aliases summary,RandomForestClassificationModel-method +#' @export +#' @note summary(RandomForestClassificationModel) since 2.1.0 +setMethod("summary", signature(object = "RandomForestClassificationModel"), + function(object) { + ans <- summary.treeEnsemble(object) + class(ans) <- "summary.RandomForestClassificationModel" + ans + }) + +# Prints the summary of Random Forest Classification Model + +#' @rdname spark.randomForest +#' @export +#' @note print.summary.RandomForestClassificationModel since 2.1.0 +print.summary.RandomForestClassificationModel <- function(x, ...) { + print.summary.treeEnsemble(x) +} + +# Makes predictions from a Random Forest Regression model or Classification model + +#' @param newData a SparkDataFrame for testing. +#' @return \code{predict} returns a SparkDataFrame containing predicted labeled in a column named +#' "prediction". +#' @rdname spark.randomForest +#' @aliases predict,RandomForestRegressionModel-method +#' @export +#' @note predict(RandomForestRegressionModel) since 2.1.0 +setMethod("predict", signature(object = "RandomForestRegressionModel"), + function(object, newData) { + predict_internal(object, newData) + }) + +#' @rdname spark.randomForest +#' @aliases predict,RandomForestClassificationModel-method +#' @export +#' @note predict(RandomForestClassificationModel) since 2.1.0 +setMethod("predict", signature(object = "RandomForestClassificationModel"), + function(object, newData) { + predict_internal(object, newData) + }) + +# Save the Random Forest Regression or Classification model to the input path. + +#' @param object A fitted Random Forest regression model or classification model. +#' @param path The directory where the model is saved. +#' @param overwrite Overwrites or not if the output path already exists. Default is FALSE +#' which means throw exception if the output path exists. +#' +#' @aliases write.ml,RandomForestRegressionModel,character-method +#' @rdname spark.randomForest +#' @export +#' @note write.ml(RandomForestRegressionModel, character) since 2.1.0 +setMethod("write.ml", signature(object = "RandomForestRegressionModel", path = "character"), + function(object, path, overwrite = FALSE) { + write_internal(object, path, overwrite) + }) + +#' @aliases write.ml,RandomForestClassificationModel,character-method +#' @rdname spark.randomForest +#' @export +#' @note write.ml(RandomForestClassificationModel, character) since 2.1.0 +setMethod("write.ml", signature(object = "RandomForestClassificationModel", path = "character"), + function(object, path, overwrite = FALSE) { + write_internal(object, path, overwrite) + })
http://git-wip-us.apache.org/repos/asf/spark/blob/6b6b555a/R/pkg/R/mllib_utils.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/mllib_utils.R b/R/pkg/R/mllib_utils.R new file mode 100644 index 0000000..720ee41 --- /dev/null +++ b/R/pkg/R/mllib_utils.R @@ -0,0 +1,119 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# mllib_utils.R: Utilities for MLlib integration + +# Integration with R's standard functions. +# Most of MLlib's argorithms are provided in two flavours: +# - a specialization of the default R methods (glm). These methods try to respect +# the inputs and the outputs of R's method to the largest extent, but some small differences +# may exist. +# - a set of methods that reflect the arguments of the other languages supported by Spark. These +# methods are prefixed with the `spark.` prefix: spark.glm, spark.kmeans, etc. + +#' Saves the MLlib model to the input path +#' +#' Saves the MLlib model to the input path. For more information, see the specific +#' MLlib model below. +#' @rdname write.ml +#' @name write.ml +#' @export +#' @seealso \link{spark.glm}, \link{glm}, +#' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.gbt}, \link{spark.isoreg}, +#' @seealso \link{spark.kmeans}, +#' @seealso \link{spark.lda}, \link{spark.logit}, \link{spark.mlp}, \link{spark.naiveBayes}, +#' @seealso \link{spark.randomForest}, \link{spark.survreg}, +#' @seealso \link{read.ml} +NULL + +#' Makes predictions from a MLlib model +#' +#' Makes predictions from a MLlib model. For more information, see the specific +#' MLlib model below. +#' @rdname predict +#' @name predict +#' @export +#' @seealso \link{spark.glm}, \link{glm}, +#' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.gbt}, \link{spark.isoreg}, +#' @seealso \link{spark.kmeans}, +#' @seealso \link{spark.logit}, \link{spark.mlp}, \link{spark.naiveBayes}, +#' @seealso \link{spark.randomForest}, \link{spark.survreg} +NULL + +write_internal <- function(object, path, overwrite = FALSE) { + writer <- callJMethod(object@jobj, "write") + if (overwrite) { + writer <- callJMethod(writer, "overwrite") + } + invisible(callJMethod(writer, "save", path)) +} + +predict_internal <- function(object, newData) { + dataFrame(callJMethod(object@jobj, "transform", newData@sdf)) +} + +#' Load a fitted MLlib model from the input path. +#' +#' @param path path of the model to read. +#' @return A fitted MLlib model. +#' @rdname read.ml +#' @name read.ml +#' @export +#' @seealso \link{write.ml} +#' @examples +#' \dontrun{ +#' path <- "path/to/model" +#' model <- read.ml(path) +#' } +#' @note read.ml since 2.0.0 +read.ml <- function(path) { + path <- suppressWarnings(normalizePath(path)) + sparkSession <- getSparkSession() + callJStatic("org.apache.spark.ml.r.RWrappers", "session", sparkSession) + jobj <- callJStatic("org.apache.spark.ml.r.RWrappers", "load", path) + if (isInstanceOf(jobj, "org.apache.spark.ml.r.NaiveBayesWrapper")) { + new("NaiveBayesModel", jobj = jobj) + } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.AFTSurvivalRegressionWrapper")) { + new("AFTSurvivalRegressionModel", jobj = jobj) + } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.GeneralizedLinearRegressionWrapper")) { + new("GeneralizedLinearRegressionModel", jobj = jobj) + } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.KMeansWrapper")) { + new("KMeansModel", jobj = jobj) + } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.LDAWrapper")) { + new("LDAModel", jobj = jobj) + } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.MultilayerPerceptronClassifierWrapper")) { + new("MultilayerPerceptronClassificationModel", jobj = jobj) + } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.IsotonicRegressionWrapper")) { + new("IsotonicRegressionModel", jobj = jobj) + } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.GaussianMixtureWrapper")) { + new("GaussianMixtureModel", jobj = jobj) + } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.ALSWrapper")) { + new("ALSModel", jobj = jobj) + } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.LogisticRegressionWrapper")) { + new("LogisticRegressionModel", jobj = jobj) + } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.RandomForestRegressorWrapper")) { + new("RandomForestRegressionModel", jobj = jobj) + } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.RandomForestClassifierWrapper")) { + new("RandomForestClassificationModel", jobj = jobj) + } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.GBTRegressorWrapper")) { + new("GBTRegressionModel", jobj = jobj) + } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.GBTClassifierWrapper")) { + new("GBTClassificationModel", jobj = jobj) + } else { + stop("Unsupported model: ", jobj) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/6b6b555a/R/pkg/inst/tests/testthat/test_mllib.R ---------------------------------------------------------------------- diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R deleted file mode 100644 index 0f0d831..0000000 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ /dev/null @@ -1,1170 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -library(testthat) - -context("MLlib functions") - -# Tests for MLlib functions in SparkR -sparkSession <- sparkR.session(enableHiveSupport = FALSE) - -absoluteSparkPath <- function(x) { - sparkHome <- sparkR.conf("spark.home") - file.path(sparkHome, x) -} - -test_that("formula of spark.glm", { - training <- suppressWarnings(createDataFrame(iris)) - # directly calling the spark API - # dot minus and intercept vs native glm - model <- spark.glm(training, Sepal_Width ~ . - Species + 0) - vals <- collect(select(predict(model, training), "prediction")) - rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) - - # feature interaction vs native glm - model <- spark.glm(training, Sepal_Width ~ Species:Sepal_Length) - vals <- collect(select(predict(model, training), "prediction")) - rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) - - # glm should work with long formula - training <- suppressWarnings(createDataFrame(iris)) - training$LongLongLongLongLongName <- training$Sepal_Width - training$VeryLongLongLongLonLongName <- training$Sepal_Length - training$AnotherLongLongLongLongName <- training$Species - model <- spark.glm(training, LongLongLongLongLongName ~ VeryLongLongLongLonLongName + - AnotherLongLongLongLongName) - vals <- collect(select(predict(model, training), "prediction")) - rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) -}) - -test_that("spark.glm and predict", { - training <- suppressWarnings(createDataFrame(iris)) - # gaussian family - model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species) - prediction <- predict(model, training) - expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double") - vals <- collect(select(prediction, "prediction")) - rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) - - # poisson family - model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species, - family = poisson(link = identity)) - prediction <- predict(model, training) - expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double") - vals <- collect(select(prediction, "prediction")) - rVals <- suppressWarnings(predict(glm(Sepal.Width ~ Sepal.Length + Species, - data = iris, family = poisson(link = identity)), iris)) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) - - # Test stats::predict is working - x <- rnorm(15) - y <- x + rnorm(15) - expect_equal(length(predict(lm(y ~ x))), 15) -}) - -test_that("spark.glm summary", { - # gaussian family - training <- suppressWarnings(createDataFrame(iris)) - stats <- summary(spark.glm(training, Sepal_Width ~ Sepal_Length + Species)) - - rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = iris)) - - coefs <- unlist(stats$coefficients) - rCoefs <- unlist(rStats$coefficients) - expect_true(all(abs(rCoefs - coefs) < 1e-4)) - expect_true(all( - rownames(stats$coefficients) == - c("(Intercept)", "Sepal_Length", "Species_versicolor", "Species_virginica"))) - expect_equal(stats$dispersion, rStats$dispersion) - expect_equal(stats$null.deviance, rStats$null.deviance) - expect_equal(stats$deviance, rStats$deviance) - expect_equal(stats$df.null, rStats$df.null) - expect_equal(stats$df.residual, rStats$df.residual) - expect_equal(stats$aic, rStats$aic) - - out <- capture.output(print(stats)) - expect_match(out[2], "Deviance Residuals:") - expect_true(any(grepl("AIC: 59.22", out))) - - # binomial family - df <- suppressWarnings(createDataFrame(iris)) - training <- df[df$Species %in% c("versicolor", "virginica"), ] - stats <- summary(spark.glm(training, Species ~ Sepal_Length + Sepal_Width, - family = binomial(link = "logit"))) - - rTraining <- iris[iris$Species %in% c("versicolor", "virginica"), ] - rStats <- summary(glm(Species ~ Sepal.Length + Sepal.Width, data = rTraining, - family = binomial(link = "logit"))) - - coefs <- unlist(stats$coefficients) - rCoefs <- unlist(rStats$coefficients) - expect_true(all(abs(rCoefs - coefs) < 1e-4)) - expect_true(all( - rownames(stats$coefficients) == - c("(Intercept)", "Sepal_Length", "Sepal_Width"))) - expect_equal(stats$dispersion, rStats$dispersion) - expect_equal(stats$null.deviance, rStats$null.deviance) - expect_equal(stats$deviance, rStats$deviance) - expect_equal(stats$df.null, rStats$df.null) - expect_equal(stats$df.residual, rStats$df.residual) - expect_equal(stats$aic, rStats$aic) - - # Test spark.glm works with weighted dataset - a1 <- c(0, 1, 2, 3) - a2 <- c(5, 2, 1, 3) - w <- c(1, 2, 3, 4) - b <- c(1, 0, 1, 0) - data <- as.data.frame(cbind(a1, a2, w, b)) - df <- createDataFrame(data) - - stats <- summary(spark.glm(df, b ~ a1 + a2, family = "binomial", weightCol = "w")) - rStats <- summary(glm(b ~ a1 + a2, family = "binomial", data = data, weights = w)) - - coefs <- unlist(stats$coefficients) - rCoefs <- unlist(rStats$coefficients) - expect_true(all(abs(rCoefs - coefs) < 1e-3)) - expect_true(all(rownames(stats$coefficients) == c("(Intercept)", "a1", "a2"))) - expect_equal(stats$dispersion, rStats$dispersion) - expect_equal(stats$null.deviance, rStats$null.deviance) - expect_equal(stats$deviance, rStats$deviance) - expect_equal(stats$df.null, rStats$df.null) - expect_equal(stats$df.residual, rStats$df.residual) - expect_equal(stats$aic, rStats$aic) - - # Test summary works on base GLM models - baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = iris) - baseSummary <- summary(baseModel) - expect_true(abs(baseSummary$deviance - 12.19313) < 1e-4) - - # Test spark.glm works with regularization parameter - data <- as.data.frame(cbind(a1, a2, b)) - df <- suppressWarnings(createDataFrame(data)) - regStats <- summary(spark.glm(df, b ~ a1 + a2, regParam = 1.0)) - expect_equal(regStats$aic, 13.32836, tolerance = 1e-4) # 13.32836 is from summary() result - - # Test spark.glm works on collinear data - A <- matrix(c(1, 2, 3, 4, 2, 4, 6, 8), 4, 2) - b <- c(1, 2, 3, 4) - data <- as.data.frame(cbind(A, b)) - df <- createDataFrame(data) - stats <- summary(spark.glm(df, b ~ . - 1)) - coefs <- unlist(stats$coefficients) - expect_true(all(abs(c(0.5, 0.25) - coefs) < 1e-4)) -}) - -test_that("spark.glm save/load", { - training <- suppressWarnings(createDataFrame(iris)) - m <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species) - s <- summary(m) - - modelPath <- tempfile(pattern = "spark-glm", fileext = ".tmp") - write.ml(m, modelPath) - expect_error(write.ml(m, modelPath)) - write.ml(m, modelPath, overwrite = TRUE) - m2 <- read.ml(modelPath) - s2 <- summary(m2) - - expect_equal(s$coefficients, s2$coefficients) - expect_equal(rownames(s$coefficients), rownames(s2$coefficients)) - expect_equal(s$dispersion, s2$dispersion) - expect_equal(s$null.deviance, s2$null.deviance) - expect_equal(s$deviance, s2$deviance) - expect_equal(s$df.null, s2$df.null) - expect_equal(s$df.residual, s2$df.residual) - expect_equal(s$aic, s2$aic) - expect_equal(s$iter, s2$iter) - expect_true(!s$is.loaded) - expect_true(s2$is.loaded) - - unlink(modelPath) -}) - - - -test_that("formula of glm", { - training <- suppressWarnings(createDataFrame(iris)) - # dot minus and intercept vs native glm - model <- glm(Sepal_Width ~ . - Species + 0, data = training) - vals <- collect(select(predict(model, training), "prediction")) - rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) - - # feature interaction vs native glm - model <- glm(Sepal_Width ~ Species:Sepal_Length, data = training) - vals <- collect(select(predict(model, training), "prediction")) - rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) - - # glm should work with long formula - training <- suppressWarnings(createDataFrame(iris)) - training$LongLongLongLongLongName <- training$Sepal_Width - training$VeryLongLongLongLonLongName <- training$Sepal_Length - training$AnotherLongLongLongLongName <- training$Species - model <- glm(LongLongLongLongLongName ~ VeryLongLongLongLonLongName + AnotherLongLongLongLongName, - data = training) - vals <- collect(select(predict(model, training), "prediction")) - rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) -}) - -test_that("glm and predict", { - training <- suppressWarnings(createDataFrame(iris)) - # gaussian family - model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training) - prediction <- predict(model, training) - expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double") - vals <- collect(select(prediction, "prediction")) - rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) - - # poisson family - model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training, - family = poisson(link = identity)) - prediction <- predict(model, training) - expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double") - vals <- collect(select(prediction, "prediction")) - rVals <- suppressWarnings(predict(glm(Sepal.Width ~ Sepal.Length + Species, - data = iris, family = poisson(link = identity)), iris)) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) - - # Test stats::predict is working - x <- rnorm(15) - y <- x + rnorm(15) - expect_equal(length(predict(lm(y ~ x))), 15) -}) - -test_that("glm summary", { - # gaussian family - training <- suppressWarnings(createDataFrame(iris)) - stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training)) - - rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = iris)) - - coefs <- unlist(stats$coefficients) - rCoefs <- unlist(rStats$coefficients) - expect_true(all(abs(rCoefs - coefs) < 1e-4)) - expect_true(all( - rownames(stats$coefficients) == - c("(Intercept)", "Sepal_Length", "Species_versicolor", "Species_virginica"))) - expect_equal(stats$dispersion, rStats$dispersion) - expect_equal(stats$null.deviance, rStats$null.deviance) - expect_equal(stats$deviance, rStats$deviance) - expect_equal(stats$df.null, rStats$df.null) - expect_equal(stats$df.residual, rStats$df.residual) - expect_equal(stats$aic, rStats$aic) - - # binomial family - df <- suppressWarnings(createDataFrame(iris)) - training <- df[df$Species %in% c("versicolor", "virginica"), ] - stats <- summary(glm(Species ~ Sepal_Length + Sepal_Width, data = training, - family = binomial(link = "logit"))) - - rTraining <- iris[iris$Species %in% c("versicolor", "virginica"), ] - rStats <- summary(glm(Species ~ Sepal.Length + Sepal.Width, data = rTraining, - family = binomial(link = "logit"))) - - coefs <- unlist(stats$coefficients) - rCoefs <- unlist(rStats$coefficients) - expect_true(all(abs(rCoefs - coefs) < 1e-4)) - expect_true(all( - rownames(stats$coefficients) == - c("(Intercept)", "Sepal_Length", "Sepal_Width"))) - expect_equal(stats$dispersion, rStats$dispersion) - expect_equal(stats$null.deviance, rStats$null.deviance) - expect_equal(stats$deviance, rStats$deviance) - expect_equal(stats$df.null, rStats$df.null) - expect_equal(stats$df.residual, rStats$df.residual) - expect_equal(stats$aic, rStats$aic) - - # Test summary works on base GLM models - baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = iris) - baseSummary <- summary(baseModel) - expect_true(abs(baseSummary$deviance - 12.19313) < 1e-4) -}) - -test_that("glm save/load", { - training <- suppressWarnings(createDataFrame(iris)) - m <- glm(Sepal_Width ~ Sepal_Length + Species, data = training) - s <- summary(m) - - modelPath <- tempfile(pattern = "glm", fileext = ".tmp") - write.ml(m, modelPath) - expect_error(write.ml(m, modelPath)) - write.ml(m, modelPath, overwrite = TRUE) - m2 <- read.ml(modelPath) - s2 <- summary(m2) - - expect_equal(s$coefficients, s2$coefficients) - expect_equal(rownames(s$coefficients), rownames(s2$coefficients)) - expect_equal(s$dispersion, s2$dispersion) - expect_equal(s$null.deviance, s2$null.deviance) - expect_equal(s$deviance, s2$deviance) - expect_equal(s$df.null, s2$df.null) - expect_equal(s$df.residual, s2$df.residual) - expect_equal(s$aic, s2$aic) - expect_equal(s$iter, s2$iter) - expect_true(!s$is.loaded) - expect_true(s2$is.loaded) - - unlink(modelPath) -}) - -test_that("spark.kmeans", { - newIris <- iris - newIris$Species <- NULL - training <- suppressWarnings(createDataFrame(newIris)) - - take(training, 1) - - model <- spark.kmeans(data = training, ~ ., k = 2, maxIter = 10, initMode = "random") - sample <- take(select(predict(model, training), "prediction"), 1) - expect_equal(typeof(sample$prediction), "integer") - expect_equal(sample$prediction, 1) - - # Test stats::kmeans is working - statsModel <- kmeans(x = newIris, centers = 2) - expect_equal(sort(unique(statsModel$cluster)), c(1, 2)) - - # Test fitted works on KMeans - fitted.model <- fitted(model) - expect_equal(sort(collect(distinct(select(fitted.model, "prediction")))$prediction), c(0, 1)) - - # Test summary works on KMeans - summary.model <- summary(model) - cluster <- summary.model$cluster - k <- summary.model$k - expect_equal(k, 2) - expect_equal(sort(collect(distinct(select(cluster, "prediction")))$prediction), c(0, 1)) - - # Test model save/load - modelPath <- tempfile(pattern = "spark-kmeans", fileext = ".tmp") - write.ml(model, modelPath) - expect_error(write.ml(model, modelPath)) - write.ml(model, modelPath, overwrite = TRUE) - model2 <- read.ml(modelPath) - summary2 <- summary(model2) - expect_equal(sort(unlist(summary.model$size)), sort(unlist(summary2$size))) - expect_equal(summary.model$coefficients, summary2$coefficients) - expect_true(!summary.model$is.loaded) - expect_true(summary2$is.loaded) - - unlink(modelPath) -}) - -test_that("spark.mlp", { - df <- read.df(absoluteSparkPath("data/mllib/sample_multiclass_classification_data.txt"), - source = "libsvm") - model <- spark.mlp(df, label ~ features, blockSize = 128, layers = c(4, 5, 4, 3), - solver = "l-bfgs", maxIter = 100, tol = 0.5, stepSize = 1, seed = 1) - - # Test summary method - summary <- summary(model) - expect_equal(summary$numOfInputs, 4) - expect_equal(summary$numOfOutputs, 3) - expect_equal(summary$layers, c(4, 5, 4, 3)) - expect_equal(length(summary$weights), 64) - expect_equal(head(summary$weights, 5), list(-0.878743, 0.2154151, -1.16304, -0.6583214, 1.009825), - tolerance = 1e-6) - - # Test predict method - mlpTestDF <- df - mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) - expect_equal(head(mlpPredictions$prediction, 6), c("1.0", "0.0", "0.0", "0.0", "0.0", "0.0")) - - # Test model save/load - modelPath <- tempfile(pattern = "spark-mlp", fileext = ".tmp") - write.ml(model, modelPath) - expect_error(write.ml(model, modelPath)) - write.ml(model, modelPath, overwrite = TRUE) - model2 <- read.ml(modelPath) - summary2 <- summary(model2) - - expect_equal(summary2$numOfInputs, 4) - expect_equal(summary2$numOfOutputs, 3) - expect_equal(summary2$layers, c(4, 5, 4, 3)) - expect_equal(length(summary2$weights), 64) - - unlink(modelPath) - - # Test default parameter - model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3)) - mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) - expect_equal(head(mlpPredictions$prediction, 10), - c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0")) - - # Test illegal parameter - expect_error(spark.mlp(df, label ~ features, layers = NULL), - "layers must be a integer vector with length > 1.") - expect_error(spark.mlp(df, label ~ features, layers = c()), - "layers must be a integer vector with length > 1.") - expect_error(spark.mlp(df, label ~ features, layers = c(3)), - "layers must be a integer vector with length > 1.") - - # Test random seed - # default seed - model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 10) - mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) - expect_equal(head(mlpPredictions$prediction, 10), - c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0")) - # seed equals 10 - model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 10, seed = 10) - mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) - expect_equal(head(mlpPredictions$prediction, 10), - c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0")) - - # test initialWeights - model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2, initialWeights = - c(0, 0, 0, 0, 0, 5, 5, 5, 5, 5, 9, 9, 9, 9, 9)) - mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) - expect_equal(head(mlpPredictions$prediction, 10), - c("1.0", "1.0", "1.0", "1.0", "2.0", "1.0", "2.0", "2.0", "1.0", "0.0")) - - model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2, initialWeights = - c(0.0, 0.0, 0.0, 0.0, 0.0, 5.0, 5.0, 5.0, 5.0, 5.0, 9.0, 9.0, 9.0, 9.0, 9.0)) - mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) - expect_equal(head(mlpPredictions$prediction, 10), - c("1.0", "1.0", "1.0", "1.0", "2.0", "1.0", "2.0", "2.0", "1.0", "0.0")) - - model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2) - mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) - expect_equal(head(mlpPredictions$prediction, 10), - c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "0.0", "2.0", "1.0", "0.0")) - - # Test formula works well - df <- suppressWarnings(createDataFrame(iris)) - model <- spark.mlp(df, Species ~ Sepal_Length + Sepal_Width + Petal_Length + Petal_Width, - layers = c(4, 3)) - summary <- summary(model) - expect_equal(summary$numOfInputs, 4) - expect_equal(summary$numOfOutputs, 3) - expect_equal(summary$layers, c(4, 3)) - expect_equal(length(summary$weights), 15) - expect_equal(head(summary$weights, 5), list(-1.1957257, -5.2693685, 7.4489734, -6.3751413, - -10.2376130), tolerance = 1e-6) -}) - -test_that("spark.naiveBayes", { - # R code to reproduce the result. - # We do not support instance weights yet. So we ignore the frequencies. - # - #' library(e1071) - #' t <- as.data.frame(Titanic) - #' t1 <- t[t$Freq > 0, -5] - #' m <- naiveBayes(Survived ~ ., data = t1) - #' m - #' predict(m, t1) - # - # -- output of 'm' - # - # A-priori probabilities: - # Y - # No Yes - # 0.4166667 0.5833333 - # - # Conditional probabilities: - # Class - # Y 1st 2nd 3rd Crew - # No 0.2000000 0.2000000 0.4000000 0.2000000 - # Yes 0.2857143 0.2857143 0.2857143 0.1428571 - # - # Sex - # Y Male Female - # No 0.5 0.5 - # Yes 0.5 0.5 - # - # Age - # Y Child Adult - # No 0.2000000 0.8000000 - # Yes 0.4285714 0.5714286 - # - # -- output of 'predict(m, t1)' - # - # Yes Yes Yes Yes No No Yes Yes No No Yes Yes Yes Yes Yes Yes Yes Yes No No Yes Yes No No - # - - t <- as.data.frame(Titanic) - t1 <- t[t$Freq > 0, -5] - df <- suppressWarnings(createDataFrame(t1)) - m <- spark.naiveBayes(df, Survived ~ ., smoothing = 0.0) - s <- summary(m) - expect_equal(as.double(s$apriori[1, "Yes"]), 0.5833333, tolerance = 1e-6) - expect_equal(sum(s$apriori), 1) - expect_equal(as.double(s$tables["Yes", "Age_Adult"]), 0.5714286, tolerance = 1e-6) - p <- collect(select(predict(m, df), "prediction")) - expect_equal(p$prediction, c("Yes", "Yes", "Yes", "Yes", "No", "No", "Yes", "Yes", "No", "No", - "Yes", "Yes", "Yes", "Yes", "Yes", "Yes", "Yes", "Yes", "No", "No", - "Yes", "Yes", "No", "No")) - - # Test model save/load - modelPath <- tempfile(pattern = "spark-naiveBayes", fileext = ".tmp") - write.ml(m, modelPath) - expect_error(write.ml(m, modelPath)) - write.ml(m, modelPath, overwrite = TRUE) - m2 <- read.ml(modelPath) - s2 <- summary(m2) - expect_equal(s$apriori, s2$apriori) - expect_equal(s$tables, s2$tables) - - unlink(modelPath) - - # Test e1071::naiveBayes - if (requireNamespace("e1071", quietly = TRUE)) { - expect_error(m <- e1071::naiveBayes(Survived ~ ., data = t1), NA) - expect_equal(as.character(predict(m, t1[1, ])), "Yes") - } - - # Test numeric response variable - t1$NumericSurvived <- ifelse(t1$Survived == "No", 0, 1) - t2 <- t1[-4] - df <- suppressWarnings(createDataFrame(t2)) - m <- spark.naiveBayes(df, NumericSurvived ~ ., smoothing = 0.0) - s <- summary(m) - expect_equal(as.double(s$apriori[1, 1]), 0.5833333, tolerance = 1e-6) - expect_equal(sum(s$apriori), 1) - expect_equal(as.double(s$tables[1, "Age_Adult"]), 0.5714286, tolerance = 1e-6) -}) - -test_that("spark.survreg", { - # R code to reproduce the result. - # - #' rData <- list(time = c(4, 3, 1, 1, 2, 2, 3), status = c(1, 1, 1, 0, 1, 1, 0), - #' x = c(0, 2, 1, 1, 1, 0, 0), sex = c(0, 0, 0, 0, 1, 1, 1)) - #' library(survival) - #' model <- survreg(Surv(time, status) ~ x + sex, rData) - #' summary(model) - #' predict(model, data) - # - # -- output of 'summary(model)' - # - # Value Std. Error z p - # (Intercept) 1.315 0.270 4.88 1.07e-06 - # x -0.190 0.173 -1.10 2.72e-01 - # sex -0.253 0.329 -0.77 4.42e-01 - # Log(scale) -1.160 0.396 -2.93 3.41e-03 - # - # -- output of 'predict(model, data)' - # - # 1 2 3 4 5 6 7 - # 3.724591 2.545368 3.079035 3.079035 2.390146 2.891269 2.891269 - # - data <- list(list(4, 1, 0, 0), list(3, 1, 2, 0), list(1, 1, 1, 0), - list(1, 0, 1, 0), list(2, 1, 1, 1), list(2, 1, 0, 1), list(3, 0, 0, 1)) - df <- createDataFrame(data, c("time", "status", "x", "sex")) - model <- spark.survreg(df, Surv(time, status) ~ x + sex) - stats <- summary(model) - coefs <- as.vector(stats$coefficients[, 1]) - rCoefs <- c(1.3149571, -0.1903409, -0.2532618, -1.1599800) - expect_equal(coefs, rCoefs, tolerance = 1e-4) - expect_true(all( - rownames(stats$coefficients) == - c("(Intercept)", "x", "sex", "Log(scale)"))) - p <- collect(select(predict(model, df), "prediction")) - expect_equal(p$prediction, c(3.724591, 2.545368, 3.079035, 3.079035, - 2.390146, 2.891269, 2.891269), tolerance = 1e-4) - - # Test model save/load - modelPath <- tempfile(pattern = "spark-survreg", fileext = ".tmp") - write.ml(model, modelPath) - expect_error(write.ml(model, modelPath)) - write.ml(model, modelPath, overwrite = TRUE) - model2 <- read.ml(modelPath) - stats2 <- summary(model2) - coefs2 <- as.vector(stats2$coefficients[, 1]) - expect_equal(coefs, coefs2) - expect_equal(rownames(stats$coefficients), rownames(stats2$coefficients)) - - unlink(modelPath) - - # Test survival::survreg - if (requireNamespace("survival", quietly = TRUE)) { - rData <- list(time = c(4, 3, 1, 1, 2, 2, 3), status = c(1, 1, 1, 0, 1, 1, 0), - x = c(0, 2, 1, 1, 1, 0, 0), sex = c(0, 0, 0, 0, 1, 1, 1)) - expect_error( - model <- survival::survreg(formula = survival::Surv(time, status) ~ x + sex, data = rData), - NA) - expect_equal(predict(model, rData)[[1]], 3.724591, tolerance = 1e-4) - } -}) - -test_that("spark.isotonicRegression", { - label <- c(7.0, 5.0, 3.0, 5.0, 1.0) - feature <- c(0.0, 1.0, 2.0, 3.0, 4.0) - weight <- c(1.0, 1.0, 1.0, 1.0, 1.0) - data <- as.data.frame(cbind(label, feature, weight)) - df <- createDataFrame(data) - - model <- spark.isoreg(df, label ~ feature, isotonic = FALSE, - weightCol = "weight") - # only allow one variable on the right hand side of the formula - expect_error(model2 <- spark.isoreg(df, ~., isotonic = FALSE)) - result <- summary(model) - expect_equal(result$predictions, list(7, 5, 4, 4, 1)) - - # Test model prediction - predict_data <- list(list(-2.0), list(-1.0), list(0.5), - list(0.75), list(1.0), list(2.0), list(9.0)) - predict_df <- createDataFrame(predict_data, c("feature")) - predict_result <- collect(select(predict(model, predict_df), "prediction")) - expect_equal(predict_result$prediction, c(7.0, 7.0, 6.0, 5.5, 5.0, 4.0, 1.0)) - - # Test model save/load - modelPath <- tempfile(pattern = "spark-isotonicRegression", fileext = ".tmp") - write.ml(model, modelPath) - expect_error(write.ml(model, modelPath)) - write.ml(model, modelPath, overwrite = TRUE) - model2 <- read.ml(modelPath) - expect_equal(result, summary(model2)) - - unlink(modelPath) -}) - -test_that("spark.logit", { - # R code to reproduce the result. - # nolint start - #' library(glmnet) - #' iris.x = as.matrix(iris[, 1:4]) - #' iris.y = as.factor(as.character(iris[, 5])) - #' logit = glmnet(iris.x, iris.y, family="multinomial", alpha=0, lambda=0.5) - #' coef(logit) - # - # $setosa - # 5 x 1 sparse Matrix of class "dgCMatrix" - # s0 - # 1.0981324 - # Sepal.Length -0.2909860 - # Sepal.Width 0.5510907 - # Petal.Length -0.1915217 - # Petal.Width -0.4211946 - # - # $versicolor - # 5 x 1 sparse Matrix of class "dgCMatrix" - # s0 - # 1.520061e+00 - # Sepal.Length 2.524501e-02 - # Sepal.Width -5.310313e-01 - # Petal.Length 3.656543e-02 - # Petal.Width -3.144464e-05 - # - # $virginica - # 5 x 1 sparse Matrix of class "dgCMatrix" - # s0 - # -2.61819385 - # Sepal.Length 0.26574097 - # Sepal.Width -0.02005932 - # Petal.Length 0.15495629 - # Petal.Width 0.42122607 - # nolint end - - # Test multinomial logistic regression againt three classes - df <- suppressWarnings(createDataFrame(iris)) - model <- spark.logit(df, Species ~ ., regParam = 0.5) - summary <- summary(model) - versicolorCoefsR <- c(1.52, 0.03, -0.53, 0.04, 0.00) - virginicaCoefsR <- c(-2.62, 0.27, -0.02, 0.16, 0.42) - setosaCoefsR <- c(1.10, -0.29, 0.55, -0.19, -0.42) - versicolorCoefs <- unlist(summary$coefficients[, "versicolor"]) - virginicaCoefs <- unlist(summary$coefficients[, "virginica"]) - setosaCoefs <- unlist(summary$coefficients[, "setosa"]) - expect_true(all(abs(versicolorCoefsR - versicolorCoefs) < 0.1)) - expect_true(all(abs(virginicaCoefsR - virginicaCoefs) < 0.1)) - expect_true(all(abs(setosaCoefs - setosaCoefs) < 0.1)) - - # Test model save and load - modelPath <- tempfile(pattern = "spark-logit", fileext = ".tmp") - write.ml(model, modelPath) - expect_error(write.ml(model, modelPath)) - write.ml(model, modelPath, overwrite = TRUE) - model2 <- read.ml(modelPath) - coefs <- summary(model)$coefficients - coefs2 <- summary(model2)$coefficients - expect_equal(coefs, coefs2) - unlink(modelPath) - - # R code to reproduce the result. - # nolint start - #' library(glmnet) - #' iris2 <- iris[iris$Species %in% c("versicolor", "virginica"), ] - #' iris.x = as.matrix(iris2[, 1:4]) - #' iris.y = as.factor(as.character(iris2[, 5])) - #' logit = glmnet(iris.x, iris.y, family="multinomial", alpha=0, lambda=0.5) - #' coef(logit) - # - # $versicolor - # 5 x 1 sparse Matrix of class "dgCMatrix" - # s0 - # 3.93844796 - # Sepal.Length -0.13538675 - # Sepal.Width -0.02386443 - # Petal.Length -0.35076451 - # Petal.Width -0.77971954 - # - # $virginica - # 5 x 1 sparse Matrix of class "dgCMatrix" - # s0 - # -3.93844796 - # Sepal.Length 0.13538675 - # Sepal.Width 0.02386443 - # Petal.Length 0.35076451 - # Petal.Width 0.77971954 - # - #' logit = glmnet(iris.x, iris.y, family="binomial", alpha=0, lambda=0.5) - #' coef(logit) - # - # 5 x 1 sparse Matrix of class "dgCMatrix" - # s0 - # (Intercept) -6.0824412 - # Sepal.Length 0.2458260 - # Sepal.Width 0.1642093 - # Petal.Length 0.4759487 - # Petal.Width 1.0383948 - # - # nolint end - - # Test multinomial logistic regression againt two classes - df <- suppressWarnings(createDataFrame(iris)) - training <- df[df$Species %in% c("versicolor", "virginica"), ] - model <- spark.logit(training, Species ~ ., regParam = 0.5, family = "multinomial") - summary <- summary(model) - versicolorCoefsR <- c(3.94, -0.16, -0.02, -0.35, -0.78) - virginicaCoefsR <- c(-3.94, 0.16, -0.02, 0.35, 0.78) - versicolorCoefs <- unlist(summary$coefficients[, "versicolor"]) - virginicaCoefs <- unlist(summary$coefficients[, "virginica"]) - expect_true(all(abs(versicolorCoefsR - versicolorCoefs) < 0.1)) - expect_true(all(abs(virginicaCoefsR - virginicaCoefs) < 0.1)) - - # Test binomial logistic regression againt two classes - model <- spark.logit(training, Species ~ ., regParam = 0.5) - summary <- summary(model) - coefsR <- c(-6.08, 0.25, 0.16, 0.48, 1.04) - coefs <- unlist(summary$coefficients[, "Estimate"]) - expect_true(all(abs(coefsR - coefs) < 0.1)) - - # Test prediction with string label - prediction <- predict(model, training) - expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "character") - expected <- c("versicolor", "versicolor", "virginica", "versicolor", "versicolor", - "versicolor", "versicolor", "versicolor", "versicolor", "versicolor") - expect_equal(as.list(take(select(prediction, "prediction"), 10))[[1]], expected) - - # Test prediction with numeric label - label <- c(0.0, 0.0, 0.0, 1.0, 1.0) - feature <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776) - data <- as.data.frame(cbind(label, feature)) - df <- createDataFrame(data) - model <- spark.logit(df, label ~ feature) - prediction <- collect(select(predict(model, df), "prediction")) - expect_equal(prediction$prediction, c("0.0", "0.0", "1.0", "1.0", "0.0")) -}) - -test_that("spark.gaussianMixture", { - # R code to reproduce the result. - # nolint start - #' library(mvtnorm) - #' set.seed(1) - #' a <- rmvnorm(7, c(0, 0)) - #' b <- rmvnorm(8, c(10, 10)) - #' data <- rbind(a, b) - #' model <- mvnormalmixEM(data, k = 2) - #' model$lambda - # - # [1] 0.4666667 0.5333333 - # - #' model$mu - # - # [1] 0.11731091 -0.06192351 - # [1] 10.363673 9.897081 - # - #' model$sigma - # - # [[1]] - # [,1] [,2] - # [1,] 0.62049934 0.06880802 - # [2,] 0.06880802 1.27431874 - # - # [[2]] - # [,1] [,2] - # [1,] 0.2961543 0.160783 - # [2,] 0.1607830 1.008878 - # nolint end - data <- list(list(-0.6264538, 0.1836433), list(-0.8356286, 1.5952808), - list(0.3295078, -0.8204684), list(0.4874291, 0.7383247), - list(0.5757814, -0.3053884), list(1.5117812, 0.3898432), - list(-0.6212406, -2.2146999), list(11.1249309, 9.9550664), - list(9.9838097, 10.9438362), list(10.8212212, 10.5939013), - list(10.9189774, 10.7821363), list(10.0745650, 8.0106483), - list(10.6198257, 9.9438713), list(9.8442045, 8.5292476), - list(9.5218499, 10.4179416)) - df <- createDataFrame(data, c("x1", "x2")) - model <- spark.gaussianMixture(df, ~ x1 + x2, k = 2) - stats <- summary(model) - rLambda <- c(0.4666667, 0.5333333) - rMu <- c(0.11731091, -0.06192351, 10.363673, 9.897081) - rSigma <- c(0.62049934, 0.06880802, 0.06880802, 1.27431874, - 0.2961543, 0.160783, 0.1607830, 1.008878) - expect_equal(stats$lambda, rLambda, tolerance = 1e-3) - expect_equal(unlist(stats$mu), rMu, tolerance = 1e-3) - expect_equal(unlist(stats$sigma), rSigma, tolerance = 1e-3) - p <- collect(select(predict(model, df), "prediction")) - expect_equal(p$prediction, c(0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1)) - - # Test model save/load - modelPath <- tempfile(pattern = "spark-gaussianMixture", fileext = ".tmp") - write.ml(model, modelPath) - expect_error(write.ml(model, modelPath)) - write.ml(model, modelPath, overwrite = TRUE) - model2 <- read.ml(modelPath) - stats2 <- summary(model2) - expect_equal(stats$lambda, stats2$lambda) - expect_equal(unlist(stats$mu), unlist(stats2$mu)) - expect_equal(unlist(stats$sigma), unlist(stats2$sigma)) - - unlink(modelPath) -}) - -test_that("spark.lda with libsvm", { - text <- read.df(absoluteSparkPath("data/mllib/sample_lda_libsvm_data.txt"), source = "libsvm") - model <- spark.lda(text, optimizer = "em") - - stats <- summary(model, 10) - isDistributed <- stats$isDistributed - logLikelihood <- stats$logLikelihood - logPerplexity <- stats$logPerplexity - vocabSize <- stats$vocabSize - topics <- stats$topicTopTerms - weights <- stats$topicTopTermsWeights - vocabulary <- stats$vocabulary - - expect_false(isDistributed) - expect_true(logLikelihood <= 0 & is.finite(logLikelihood)) - expect_true(logPerplexity >= 0 & is.finite(logPerplexity)) - expect_equal(vocabSize, 11) - expect_true(is.null(vocabulary)) - - # Test model save/load - modelPath <- tempfile(pattern = "spark-lda", fileext = ".tmp") - write.ml(model, modelPath) - expect_error(write.ml(model, modelPath)) - write.ml(model, modelPath, overwrite = TRUE) - model2 <- read.ml(modelPath) - stats2 <- summary(model2) - - expect_false(stats2$isDistributed) - expect_equal(logLikelihood, stats2$logLikelihood) - expect_equal(logPerplexity, stats2$logPerplexity) - expect_equal(vocabSize, stats2$vocabSize) - expect_equal(vocabulary, stats2$vocabulary) - - unlink(modelPath) -}) - -test_that("spark.lda with text input", { - text <- read.text(absoluteSparkPath("data/mllib/sample_lda_data.txt")) - model <- spark.lda(text, optimizer = "online", features = "value") - - stats <- summary(model) - isDistributed <- stats$isDistributed - logLikelihood <- stats$logLikelihood - logPerplexity <- stats$logPerplexity - vocabSize <- stats$vocabSize - topics <- stats$topicTopTerms - weights <- stats$topicTopTermsWeights - vocabulary <- stats$vocabulary - - expect_false(isDistributed) - expect_true(logLikelihood <= 0 & is.finite(logLikelihood)) - expect_true(logPerplexity >= 0 & is.finite(logPerplexity)) - expect_equal(vocabSize, 10) - expect_true(setequal(stats$vocabulary, c("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"))) - - # Test model save/load - modelPath <- tempfile(pattern = "spark-lda-text", fileext = ".tmp") - write.ml(model, modelPath) - expect_error(write.ml(model, modelPath)) - write.ml(model, modelPath, overwrite = TRUE) - model2 <- read.ml(modelPath) - stats2 <- summary(model2) - - expect_false(stats2$isDistributed) - expect_equal(logLikelihood, stats2$logLikelihood) - expect_equal(logPerplexity, stats2$logPerplexity) - expect_equal(vocabSize, stats2$vocabSize) - expect_true(all.equal(vocabulary, stats2$vocabulary)) - - unlink(modelPath) -}) - -test_that("spark.posterior and spark.perplexity", { - text <- read.text(absoluteSparkPath("data/mllib/sample_lda_data.txt")) - model <- spark.lda(text, features = "value", k = 3) - - # Assert perplexities are equal - stats <- summary(model) - logPerplexity <- spark.perplexity(model, text) - expect_equal(logPerplexity, stats$logPerplexity) - - # Assert the sum of every topic distribution is equal to 1 - posterior <- spark.posterior(model, text) - local.posterior <- collect(posterior)$topicDistribution - expect_equal(length(local.posterior), sum(unlist(local.posterior))) -}) - -test_that("spark.als", { - data <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0), list(1, 2, 4.0), - list(2, 1, 1.0), list(2, 2, 5.0)) - df <- createDataFrame(data, c("user", "item", "score")) - model <- spark.als(df, ratingCol = "score", userCol = "user", itemCol = "item", - rank = 10, maxIter = 5, seed = 0, regParam = 0.1) - stats <- summary(model) - expect_equal(stats$rank, 10) - test <- createDataFrame(list(list(0, 2), list(1, 0), list(2, 0)), c("user", "item")) - predictions <- collect(predict(model, test)) - - expect_equal(predictions$prediction, c(-0.1380762, 2.6258414, -1.5018409), - tolerance = 1e-4) - - # Test model save/load - modelPath <- tempfile(pattern = "spark-als", fileext = ".tmp") - write.ml(model, modelPath) - expect_error(write.ml(model, modelPath)) - write.ml(model, modelPath, overwrite = TRUE) - model2 <- read.ml(modelPath) - stats2 <- summary(model2) - expect_equal(stats2$rating, "score") - userFactors <- collect(stats$userFactors) - itemFactors <- collect(stats$itemFactors) - userFactors2 <- collect(stats2$userFactors) - itemFactors2 <- collect(stats2$itemFactors) - - orderUser <- order(userFactors$id) - orderUser2 <- order(userFactors2$id) - expect_equal(userFactors$id[orderUser], userFactors2$id[orderUser2]) - expect_equal(userFactors$features[orderUser], userFactors2$features[orderUser2]) - - orderItem <- order(itemFactors$id) - orderItem2 <- order(itemFactors2$id) - expect_equal(itemFactors$id[orderItem], itemFactors2$id[orderItem2]) - expect_equal(itemFactors$features[orderItem], itemFactors2$features[orderItem2]) - - unlink(modelPath) -}) - -test_that("spark.kstest", { - data <- data.frame(test = c(0.1, 0.15, 0.2, 0.3, 0.25, -1, -0.5)) - df <- createDataFrame(data) - testResult <- spark.kstest(df, "test", "norm") - stats <- summary(testResult) - - rStats <- ks.test(data$test, "pnorm", alternative = "two.sided") - - expect_equal(stats$p.value, rStats$p.value, tolerance = 1e-4) - expect_equal(stats$statistic, unname(rStats$statistic), tolerance = 1e-4) - expect_match(capture.output(stats)[1], "Kolmogorov-Smirnov test summary:") - - testResult <- spark.kstest(df, "test", "norm", -0.5) - stats <- summary(testResult) - - rStats <- ks.test(data$test, "pnorm", -0.5, 1, alternative = "two.sided") - - expect_equal(stats$p.value, rStats$p.value, tolerance = 1e-4) - expect_equal(stats$statistic, unname(rStats$statistic), tolerance = 1e-4) - expect_match(capture.output(stats)[1], "Kolmogorov-Smirnov test summary:") - - # Test print.summary.KSTest - printStats <- capture.output(print.summary.KSTest(stats)) - expect_match(printStats[1], "Kolmogorov-Smirnov test summary:") - expect_match(printStats[5], - "Low presumption against null hypothesis: Sample follows theoretical distribution. ") -}) - -test_that("spark.randomForest", { - # regression - data <- suppressWarnings(createDataFrame(longley)) - model <- spark.randomForest(data, Employed ~ ., "regression", maxDepth = 5, maxBins = 16, - numTrees = 1) - - predictions <- collect(predict(model, data)) - expect_equal(predictions$prediction, c(60.323, 61.122, 60.171, 61.187, - 63.221, 63.639, 64.989, 63.761, - 66.019, 67.857, 68.169, 66.513, - 68.655, 69.564, 69.331, 70.551), - tolerance = 1e-4) - - stats <- summary(model) - expect_equal(stats$numTrees, 1) - expect_error(capture.output(stats), NA) - expect_true(length(capture.output(stats)) > 6) - - model <- spark.randomForest(data, Employed ~ ., "regression", maxDepth = 5, maxBins = 16, - numTrees = 20, seed = 123) - predictions <- collect(predict(model, data)) - expect_equal(predictions$prediction, c(60.32820, 61.22315, 60.69025, 62.11070, - 63.53160, 64.05470, 65.12710, 64.30450, - 66.70910, 67.86125, 68.08700, 67.21865, - 68.89275, 69.53180, 69.39640, 69.68250), - - tolerance = 1e-4) - stats <- summary(model) - expect_equal(stats$numTrees, 20) - - modelPath <- tempfile(pattern = "spark-randomForestRegression", fileext = ".tmp") - write.ml(model, modelPath) - expect_error(write.ml(model, modelPath)) - write.ml(model, modelPath, overwrite = TRUE) - model2 <- read.ml(modelPath) - stats2 <- summary(model2) - expect_equal(stats$formula, stats2$formula) - expect_equal(stats$numFeatures, stats2$numFeatures) - expect_equal(stats$features, stats2$features) - expect_equal(stats$featureImportances, stats2$featureImportances) - expect_equal(stats$numTrees, stats2$numTrees) - expect_equal(stats$treeWeights, stats2$treeWeights) - - unlink(modelPath) - - # classification - data <- suppressWarnings(createDataFrame(iris)) - model <- spark.randomForest(data, Species ~ Petal_Length + Petal_Width, "classification", - maxDepth = 5, maxBins = 16) - - stats <- summary(model) - expect_equal(stats$numFeatures, 2) - expect_equal(stats$numTrees, 20) - expect_error(capture.output(stats), NA) - expect_true(length(capture.output(stats)) > 6) - # Test string prediction values - predictions <- collect(predict(model, data))$prediction - expect_equal(length(grep("setosa", predictions)), 50) - expect_equal(length(grep("versicolor", predictions)), 50) - - modelPath <- tempfile(pattern = "spark-randomForestClassification", fileext = ".tmp") - write.ml(model, modelPath) - expect_error(write.ml(model, modelPath)) - write.ml(model, modelPath, overwrite = TRUE) - model2 <- read.ml(modelPath) - stats2 <- summary(model2) - expect_equal(stats$depth, stats2$depth) - expect_equal(stats$numNodes, stats2$numNodes) - expect_equal(stats$numClasses, stats2$numClasses) - - unlink(modelPath) - - # Test numeric response variable - labelToIndex <- function(species) { - switch(as.character(species), - setosa = 0.0, - versicolor = 1.0, - virginica = 2.0 - ) - } - iris$NumericSpecies <- lapply(iris$Species, labelToIndex) - data <- suppressWarnings(createDataFrame(iris[-5])) - model <- spark.randomForest(data, NumericSpecies ~ Petal_Length + Petal_Width, "classification", - maxDepth = 5, maxBins = 16) - stats <- summary(model) - expect_equal(stats$numFeatures, 2) - expect_equal(stats$numTrees, 20) - # Test numeric prediction values - predictions <- collect(predict(model, data))$prediction - expect_equal(length(grep("1.0", predictions)), 50) - expect_equal(length(grep("2.0", predictions)), 50) - - # spark.randomForest classification can work on libsvm data - data <- read.df(absoluteSparkPath("data/mllib/sample_multiclass_classification_data.txt"), - source = "libsvm") - model <- spark.randomForest(data, label ~ features, "classification") - expect_equal(summary(model)$numFeatures, 4) -}) - -test_that("spark.gbt", { - # regression - data <- suppressWarnings(createDataFrame(longley)) - model <- spark.gbt(data, Employed ~ ., "regression", maxDepth = 5, maxBins = 16, seed = 123) - predictions <- collect(predict(model, data)) - expect_equal(predictions$prediction, c(60.323, 61.122, 60.171, 61.187, - 63.221, 63.639, 64.989, 63.761, - 66.019, 67.857, 68.169, 66.513, - 68.655, 69.564, 69.331, 70.551), - tolerance = 1e-4) - stats <- summary(model) - expect_equal(stats$numTrees, 20) - expect_equal(stats$formula, "Employed ~ .") - expect_equal(stats$numFeatures, 6) - expect_equal(length(stats$treeWeights), 20) - - modelPath <- tempfile(pattern = "spark-gbtRegression", fileext = ".tmp") - write.ml(model, modelPath) - expect_error(write.ml(model, modelPath)) - write.ml(model, modelPath, overwrite = TRUE) - model2 <- read.ml(modelPath) - stats2 <- summary(model2) - expect_equal(stats$formula, stats2$formula) - expect_equal(stats$numFeatures, stats2$numFeatures) - expect_equal(stats$features, stats2$features) - expect_equal(stats$featureImportances, stats2$featureImportances) - expect_equal(stats$numTrees, stats2$numTrees) - expect_equal(stats$treeWeights, stats2$treeWeights) - - unlink(modelPath) - - # classification - # label must be binary - GBTClassifier currently only supports binary classification. - iris2 <- iris[iris$Species != "virginica", ] - data <- suppressWarnings(createDataFrame(iris2)) - model <- spark.gbt(data, Species ~ Petal_Length + Petal_Width, "classification") - stats <- summary(model) - expect_equal(stats$numFeatures, 2) - expect_equal(stats$numTrees, 20) - expect_error(capture.output(stats), NA) - expect_true(length(capture.output(stats)) > 6) - predictions <- collect(predict(model, data))$prediction - # test string prediction values - expect_equal(length(grep("setosa", predictions)), 50) - expect_equal(length(grep("versicolor", predictions)), 50) - - modelPath <- tempfile(pattern = "spark-gbtClassification", fileext = ".tmp") - write.ml(model, modelPath) - expect_error(write.ml(model, modelPath)) - write.ml(model, modelPath, overwrite = TRUE) - model2 <- read.ml(modelPath) - stats2 <- summary(model2) - expect_equal(stats$depth, stats2$depth) - expect_equal(stats$numNodes, stats2$numNodes) - expect_equal(stats$numClasses, stats2$numClasses) - - unlink(modelPath) - - iris2$NumericSpecies <- ifelse(iris2$Species == "setosa", 0, 1) - df <- suppressWarnings(createDataFrame(iris2)) - m <- spark.gbt(df, NumericSpecies ~ ., type = "classification") - s <- summary(m) - # test numeric prediction values - expect_equal(iris2$NumericSpecies, as.double(collect(predict(m, df))$prediction)) - expect_equal(s$numFeatures, 5) - expect_equal(s$numTrees, 20) - - # spark.gbt classification can work on libsvm data - data <- read.df(absoluteSparkPath("data/mllib/sample_binary_classification_data.txt"), - source = "libsvm") - model <- spark.gbt(data, label ~ features, "classification") - expect_equal(summary(model)$numFeatures, 692) -}) - -sparkR.session.stop() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org