http://git-wip-us.apache.org/repos/asf/spark/blob/ae4e8ae4/R/pkg/inst/tests/testthat/test_mllib.R ---------------------------------------------------------------------- diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R deleted file mode 100644 index 8fe3a87..0000000 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ /dev/null @@ -1,1205 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -library(testthat) - -context("MLlib functions") - -# Tests for MLlib functions in SparkR -sparkSession <- sparkR.session(enableHiveSupport = FALSE) - -absoluteSparkPath <- function(x) { - sparkHome <- sparkR.conf("spark.home") - file.path(sparkHome, x) -} - -test_that("formula of spark.glm", { - training <- suppressWarnings(createDataFrame(iris)) - # directly calling the spark API - # dot minus and intercept vs native glm - model <- spark.glm(training, Sepal_Width ~ . - Species + 0) - vals <- collect(select(predict(model, training), "prediction")) - rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) - - # feature interaction vs native glm - model <- spark.glm(training, Sepal_Width ~ Species:Sepal_Length) - vals <- collect(select(predict(model, training), "prediction")) - rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) - - # glm should work with long formula - training <- suppressWarnings(createDataFrame(iris)) - training$LongLongLongLongLongName <- training$Sepal_Width - training$VeryLongLongLongLonLongName <- training$Sepal_Length - training$AnotherLongLongLongLongName <- training$Species - model <- spark.glm(training, LongLongLongLongLongName ~ VeryLongLongLongLonLongName + - AnotherLongLongLongLongName) - vals <- collect(select(predict(model, training), "prediction")) - rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) -}) - -test_that("spark.glm and predict", { - training <- suppressWarnings(createDataFrame(iris)) - # gaussian family - model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species) - prediction <- predict(model, training) - expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double") - vals <- collect(select(prediction, "prediction")) - rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) - - # poisson family - model <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species, - family = poisson(link = identity)) - prediction <- predict(model, training) - expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double") - vals <- collect(select(prediction, "prediction")) - rVals <- suppressWarnings(predict(glm(Sepal.Width ~ Sepal.Length + Species, - data = iris, family = poisson(link = identity)), iris)) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) - - # Gamma family - x <- runif(100, -1, 1) - y <- rgamma(100, rate = 10 / exp(0.5 + 1.2 * x), shape = 10) - df <- as.DataFrame(as.data.frame(list(x = x, y = y))) - model <- glm(y ~ x, family = Gamma, df) - out <- capture.output(print(summary(model))) - expect_true(any(grepl("Dispersion parameter for gamma family", out))) - - # Test stats::predict is working - x <- rnorm(15) - y <- x + rnorm(15) - expect_equal(length(predict(lm(y ~ x))), 15) -}) - -test_that("spark.glm summary", { - # gaussian family - training <- suppressWarnings(createDataFrame(iris)) - stats <- summary(spark.glm(training, Sepal_Width ~ Sepal_Length + Species)) - - rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = iris)) - - coefs <- unlist(stats$coefficients) - rCoefs <- unlist(rStats$coefficients) - expect_true(all(abs(rCoefs - coefs) < 1e-4)) - expect_true(all( - rownames(stats$coefficients) == - c("(Intercept)", "Sepal_Length", "Species_versicolor", "Species_virginica"))) - expect_equal(stats$dispersion, rStats$dispersion) - expect_equal(stats$null.deviance, rStats$null.deviance) - expect_equal(stats$deviance, rStats$deviance) - expect_equal(stats$df.null, rStats$df.null) - expect_equal(stats$df.residual, rStats$df.residual) - expect_equal(stats$aic, rStats$aic) - - out <- capture.output(print(stats)) - expect_match(out[2], "Deviance Residuals:") - expect_true(any(grepl("AIC: 59.22", out))) - - # binomial family - df <- suppressWarnings(createDataFrame(iris)) - training <- df[df$Species %in% c("versicolor", "virginica"), ] - stats <- summary(spark.glm(training, Species ~ Sepal_Length + Sepal_Width, - family = binomial(link = "logit"))) - - rTraining <- iris[iris$Species %in% c("versicolor", "virginica"), ] - rStats <- summary(glm(Species ~ Sepal.Length + Sepal.Width, data = rTraining, - family = binomial(link = "logit"))) - - coefs <- unlist(stats$coefficients) - rCoefs <- unlist(rStats$coefficients) - expect_true(all(abs(rCoefs - coefs) < 1e-4)) - expect_true(all( - rownames(stats$coefficients) == - c("(Intercept)", "Sepal_Length", "Sepal_Width"))) - expect_equal(stats$dispersion, rStats$dispersion) - expect_equal(stats$null.deviance, rStats$null.deviance) - expect_equal(stats$deviance, rStats$deviance) - expect_equal(stats$df.null, rStats$df.null) - expect_equal(stats$df.residual, rStats$df.residual) - expect_equal(stats$aic, rStats$aic) - - # Test spark.glm works with weighted dataset - a1 <- c(0, 1, 2, 3) - a2 <- c(5, 2, 1, 3) - w <- c(1, 2, 3, 4) - b <- c(1, 0, 1, 0) - data <- as.data.frame(cbind(a1, a2, w, b)) - df <- suppressWarnings(createDataFrame(data)) - - stats <- summary(spark.glm(df, b ~ a1 + a2, family = "binomial", weightCol = "w")) - rStats <- summary(glm(b ~ a1 + a2, family = "binomial", data = data, weights = w)) - - coefs <- unlist(stats$coefficients) - rCoefs <- unlist(rStats$coefficients) - expect_true(all(abs(rCoefs - coefs) < 1e-3)) - expect_true(all(rownames(stats$coefficients) == c("(Intercept)", "a1", "a2"))) - expect_equal(stats$dispersion, rStats$dispersion) - expect_equal(stats$null.deviance, rStats$null.deviance) - expect_equal(stats$deviance, rStats$deviance) - expect_equal(stats$df.null, rStats$df.null) - expect_equal(stats$df.residual, rStats$df.residual) - expect_equal(stats$aic, rStats$aic) - - # Test summary works on base GLM models - baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = iris) - baseSummary <- summary(baseModel) - expect_true(abs(baseSummary$deviance - 12.19313) < 1e-4) - - # Test spark.glm works with regularization parameter - data <- as.data.frame(cbind(a1, a2, b)) - df <- suppressWarnings(createDataFrame(data)) - regStats <- summary(spark.glm(df, b ~ a1 + a2, regParam = 1.0)) - expect_equal(regStats$aic, 13.32836, tolerance = 1e-4) # 13.32836 is from summary() result - - # Test spark.glm works on collinear data - A <- matrix(c(1, 2, 3, 4, 2, 4, 6, 8), 4, 2) - b <- c(1, 2, 3, 4) - data <- as.data.frame(cbind(A, b)) - df <- createDataFrame(data) - stats <- summary(spark.glm(df, b ~ . - 1)) - coefs <- unlist(stats$coefficients) - expect_true(all(abs(c(0.5, 0.25) - coefs) < 1e-4)) -}) - -test_that("spark.glm save/load", { - training <- suppressWarnings(createDataFrame(iris)) - m <- spark.glm(training, Sepal_Width ~ Sepal_Length + Species) - s <- summary(m) - - modelPath <- tempfile(pattern = "spark-glm", fileext = ".tmp") - write.ml(m, modelPath) - expect_error(write.ml(m, modelPath)) - write.ml(m, modelPath, overwrite = TRUE) - m2 <- read.ml(modelPath) - s2 <- summary(m2) - - expect_equal(s$coefficients, s2$coefficients) - expect_equal(rownames(s$coefficients), rownames(s2$coefficients)) - expect_equal(s$dispersion, s2$dispersion) - expect_equal(s$null.deviance, s2$null.deviance) - expect_equal(s$deviance, s2$deviance) - expect_equal(s$df.null, s2$df.null) - expect_equal(s$df.residual, s2$df.residual) - expect_equal(s$aic, s2$aic) - expect_equal(s$iter, s2$iter) - expect_true(!s$is.loaded) - expect_true(s2$is.loaded) - - unlink(modelPath) -}) - - - -test_that("formula of glm", { - training <- suppressWarnings(createDataFrame(iris)) - # dot minus and intercept vs native glm - model <- glm(Sepal_Width ~ . - Species + 0, data = training) - vals <- collect(select(predict(model, training), "prediction")) - rVals <- predict(glm(Sepal.Width ~ . - Species + 0, data = iris), iris) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) - - # feature interaction vs native glm - model <- glm(Sepal_Width ~ Species:Sepal_Length, data = training) - vals <- collect(select(predict(model, training), "prediction")) - rVals <- predict(glm(Sepal.Width ~ Species:Sepal.Length, data = iris), iris) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) - - # glm should work with long formula - training <- suppressWarnings(createDataFrame(iris)) - training$LongLongLongLongLongName <- training$Sepal_Width - training$VeryLongLongLongLonLongName <- training$Sepal_Length - training$AnotherLongLongLongLongName <- training$Species - model <- glm(LongLongLongLongLongName ~ VeryLongLongLongLonLongName + AnotherLongLongLongLongName, - data = training) - vals <- collect(select(predict(model, training), "prediction")) - rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) -}) - -test_that("glm and predict", { - training <- suppressWarnings(createDataFrame(iris)) - # gaussian family - model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training) - prediction <- predict(model, training) - expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double") - vals <- collect(select(prediction, "prediction")) - rVals <- predict(glm(Sepal.Width ~ Sepal.Length + Species, data = iris), iris) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) - - # poisson family - model <- glm(Sepal_Width ~ Sepal_Length + Species, data = training, - family = poisson(link = identity)) - prediction <- predict(model, training) - expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "double") - vals <- collect(select(prediction, "prediction")) - rVals <- suppressWarnings(predict(glm(Sepal.Width ~ Sepal.Length + Species, - data = iris, family = poisson(link = identity)), iris)) - expect_true(all(abs(rVals - vals) < 1e-6), rVals - vals) - - # Test stats::predict is working - x <- rnorm(15) - y <- x + rnorm(15) - expect_equal(length(predict(lm(y ~ x))), 15) -}) - -test_that("glm summary", { - # gaussian family - training <- suppressWarnings(createDataFrame(iris)) - stats <- summary(glm(Sepal_Width ~ Sepal_Length + Species, data = training)) - - rStats <- summary(glm(Sepal.Width ~ Sepal.Length + Species, data = iris)) - - coefs <- unlist(stats$coefficients) - rCoefs <- unlist(rStats$coefficients) - expect_true(all(abs(rCoefs - coefs) < 1e-4)) - expect_true(all( - rownames(stats$coefficients) == - c("(Intercept)", "Sepal_Length", "Species_versicolor", "Species_virginica"))) - expect_equal(stats$dispersion, rStats$dispersion) - expect_equal(stats$null.deviance, rStats$null.deviance) - expect_equal(stats$deviance, rStats$deviance) - expect_equal(stats$df.null, rStats$df.null) - expect_equal(stats$df.residual, rStats$df.residual) - expect_equal(stats$aic, rStats$aic) - - # binomial family - df <- suppressWarnings(createDataFrame(iris)) - training <- df[df$Species %in% c("versicolor", "virginica"), ] - stats <- summary(glm(Species ~ Sepal_Length + Sepal_Width, data = training, - family = binomial(link = "logit"))) - - rTraining <- iris[iris$Species %in% c("versicolor", "virginica"), ] - rStats <- summary(glm(Species ~ Sepal.Length + Sepal.Width, data = rTraining, - family = binomial(link = "logit"))) - - coefs <- unlist(stats$coefficients) - rCoefs <- unlist(rStats$coefficients) - expect_true(all(abs(rCoefs - coefs) < 1e-4)) - expect_true(all( - rownames(stats$coefficients) == - c("(Intercept)", "Sepal_Length", "Sepal_Width"))) - expect_equal(stats$dispersion, rStats$dispersion) - expect_equal(stats$null.deviance, rStats$null.deviance) - expect_equal(stats$deviance, rStats$deviance) - expect_equal(stats$df.null, rStats$df.null) - expect_equal(stats$df.residual, rStats$df.residual) - expect_equal(stats$aic, rStats$aic) - - # Test summary works on base GLM models - baseModel <- stats::glm(Sepal.Width ~ Sepal.Length + Species, data = iris) - baseSummary <- summary(baseModel) - expect_true(abs(baseSummary$deviance - 12.19313) < 1e-4) -}) - -test_that("glm save/load", { - training <- suppressWarnings(createDataFrame(iris)) - m <- glm(Sepal_Width ~ Sepal_Length + Species, data = training) - s <- summary(m) - - modelPath <- tempfile(pattern = "glm", fileext = ".tmp") - write.ml(m, modelPath) - expect_error(write.ml(m, modelPath)) - write.ml(m, modelPath, overwrite = TRUE) - m2 <- read.ml(modelPath) - s2 <- summary(m2) - - expect_equal(s$coefficients, s2$coefficients) - expect_equal(rownames(s$coefficients), rownames(s2$coefficients)) - expect_equal(s$dispersion, s2$dispersion) - expect_equal(s$null.deviance, s2$null.deviance) - expect_equal(s$deviance, s2$deviance) - expect_equal(s$df.null, s2$df.null) - expect_equal(s$df.residual, s2$df.residual) - expect_equal(s$aic, s2$aic) - expect_equal(s$iter, s2$iter) - expect_true(!s$is.loaded) - expect_true(s2$is.loaded) - - unlink(modelPath) -}) - -test_that("spark.kmeans", { - newIris <- iris - newIris$Species <- NULL - training <- suppressWarnings(createDataFrame(newIris)) - - take(training, 1) - - model <- spark.kmeans(data = training, ~ ., k = 2, maxIter = 10, initMode = "random") - sample <- take(select(predict(model, training), "prediction"), 1) - expect_equal(typeof(sample$prediction), "integer") - expect_equal(sample$prediction, 1) - - # Test stats::kmeans is working - statsModel <- kmeans(x = newIris, centers = 2) - expect_equal(sort(unique(statsModel$cluster)), c(1, 2)) - - # Test fitted works on KMeans - fitted.model <- fitted(model) - expect_equal(sort(collect(distinct(select(fitted.model, "prediction")))$prediction), c(0, 1)) - - # Test summary works on KMeans - summary.model <- summary(model) - cluster <- summary.model$cluster - k <- summary.model$k - expect_equal(k, 2) - expect_equal(sort(collect(distinct(select(cluster, "prediction")))$prediction), c(0, 1)) - - # Test model save/load - modelPath <- tempfile(pattern = "spark-kmeans", fileext = ".tmp") - write.ml(model, modelPath) - expect_error(write.ml(model, modelPath)) - write.ml(model, modelPath, overwrite = TRUE) - model2 <- read.ml(modelPath) - summary2 <- summary(model2) - expect_equal(sort(unlist(summary.model$size)), sort(unlist(summary2$size))) - expect_equal(summary.model$coefficients, summary2$coefficients) - expect_true(!summary.model$is.loaded) - expect_true(summary2$is.loaded) - - unlink(modelPath) - - # Test Kmeans on dataset that is sensitive to seed value - col1 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0) - col2 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0) - col3 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0) - cols <- as.data.frame(cbind(col1, col2, col3)) - df <- createDataFrame(cols) - - model1 <- spark.kmeans(data = df, ~ ., k = 5, maxIter = 10, - initMode = "random", seed = 1, tol = 1E-5) - model2 <- spark.kmeans(data = df, ~ ., k = 5, maxIter = 10, - initMode = "random", seed = 22222, tol = 1E-5) - - summary.model1 <- summary(model1) - summary.model2 <- summary(model2) - cluster1 <- summary.model1$cluster - cluster2 <- summary.model2$cluster - clusterSize1 <- summary.model1$clusterSize - clusterSize2 <- summary.model2$clusterSize - - # The predicted clusters are different - expect_equal(sort(collect(distinct(select(cluster1, "prediction")))$prediction), - c(0, 1, 2, 3)) - expect_equal(sort(collect(distinct(select(cluster2, "prediction")))$prediction), - c(0, 1, 2)) - expect_equal(clusterSize1, 4) - expect_equal(clusterSize2, 3) -}) - -test_that("spark.mlp", { - df <- read.df(absoluteSparkPath("data/mllib/sample_multiclass_classification_data.txt"), - source = "libsvm") - model <- spark.mlp(df, label ~ features, blockSize = 128, layers = c(4, 5, 4, 3), - solver = "l-bfgs", maxIter = 100, tol = 0.5, stepSize = 1, seed = 1) - - # Test summary method - summary <- summary(model) - expect_equal(summary$numOfInputs, 4) - expect_equal(summary$numOfOutputs, 3) - expect_equal(summary$layers, c(4, 5, 4, 3)) - expect_equal(length(summary$weights), 64) - expect_equal(head(summary$weights, 5), list(-0.878743, 0.2154151, -1.16304, -0.6583214, 1.009825), - tolerance = 1e-6) - - # Test predict method - mlpTestDF <- df - mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) - expect_equal(head(mlpPredictions$prediction, 6), c("1.0", "0.0", "0.0", "0.0", "0.0", "0.0")) - - # Test model save/load - modelPath <- tempfile(pattern = "spark-mlp", fileext = ".tmp") - write.ml(model, modelPath) - expect_error(write.ml(model, modelPath)) - write.ml(model, modelPath, overwrite = TRUE) - model2 <- read.ml(modelPath) - summary2 <- summary(model2) - - expect_equal(summary2$numOfInputs, 4) - expect_equal(summary2$numOfOutputs, 3) - expect_equal(summary2$layers, c(4, 5, 4, 3)) - expect_equal(length(summary2$weights), 64) - - unlink(modelPath) - - # Test default parameter - model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3)) - mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) - expect_equal(head(mlpPredictions$prediction, 10), - c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0")) - - # Test illegal parameter - expect_error(spark.mlp(df, label ~ features, layers = NULL), - "layers must be a integer vector with length > 1.") - expect_error(spark.mlp(df, label ~ features, layers = c()), - "layers must be a integer vector with length > 1.") - expect_error(spark.mlp(df, label ~ features, layers = c(3)), - "layers must be a integer vector with length > 1.") - - # Test random seed - # default seed - model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 10) - mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) - expect_equal(head(mlpPredictions$prediction, 10), - c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0")) - # seed equals 10 - model <- spark.mlp(df, label ~ features, layers = c(4, 5, 4, 3), maxIter = 10, seed = 10) - mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) - expect_equal(head(mlpPredictions$prediction, 10), - c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "2.0", "2.0", "1.0", "0.0")) - - # test initialWeights - model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2, initialWeights = - c(0, 0, 0, 0, 0, 5, 5, 5, 5, 5, 9, 9, 9, 9, 9)) - mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) - expect_equal(head(mlpPredictions$prediction, 10), - c("1.0", "1.0", "1.0", "1.0", "2.0", "1.0", "2.0", "2.0", "1.0", "0.0")) - - model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2, initialWeights = - c(0.0, 0.0, 0.0, 0.0, 0.0, 5.0, 5.0, 5.0, 5.0, 5.0, 9.0, 9.0, 9.0, 9.0, 9.0)) - mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) - expect_equal(head(mlpPredictions$prediction, 10), - c("1.0", "1.0", "1.0", "1.0", "2.0", "1.0", "2.0", "2.0", "1.0", "0.0")) - - model <- spark.mlp(df, label ~ features, layers = c(4, 3), maxIter = 2) - mlpPredictions <- collect(select(predict(model, mlpTestDF), "prediction")) - expect_equal(head(mlpPredictions$prediction, 10), - c("1.0", "1.0", "1.0", "1.0", "0.0", "1.0", "0.0", "2.0", "1.0", "0.0")) - - # Test formula works well - df <- suppressWarnings(createDataFrame(iris)) - model <- spark.mlp(df, Species ~ Sepal_Length + Sepal_Width + Petal_Length + Petal_Width, - layers = c(4, 3)) - summary <- summary(model) - expect_equal(summary$numOfInputs, 4) - expect_equal(summary$numOfOutputs, 3) - expect_equal(summary$layers, c(4, 3)) - expect_equal(length(summary$weights), 15) - expect_equal(head(summary$weights, 5), list(-1.1957257, -5.2693685, 7.4489734, -6.3751413, - -10.2376130), tolerance = 1e-6) -}) - -test_that("spark.naiveBayes", { - # R code to reproduce the result. - # We do not support instance weights yet. So we ignore the frequencies. - # - #' library(e1071) - #' t <- as.data.frame(Titanic) - #' t1 <- t[t$Freq > 0, -5] - #' m <- naiveBayes(Survived ~ ., data = t1) - #' m - #' predict(m, t1) - # - # -- output of 'm' - # - # A-priori probabilities: - # Y - # No Yes - # 0.4166667 0.5833333 - # - # Conditional probabilities: - # Class - # Y 1st 2nd 3rd Crew - # No 0.2000000 0.2000000 0.4000000 0.2000000 - # Yes 0.2857143 0.2857143 0.2857143 0.1428571 - # - # Sex - # Y Male Female - # No 0.5 0.5 - # Yes 0.5 0.5 - # - # Age - # Y Child Adult - # No 0.2000000 0.8000000 - # Yes 0.4285714 0.5714286 - # - # -- output of 'predict(m, t1)' - # - # Yes Yes Yes Yes No No Yes Yes No No Yes Yes Yes Yes Yes Yes Yes Yes No No Yes Yes No No - # - - t <- as.data.frame(Titanic) - t1 <- t[t$Freq > 0, -5] - df <- suppressWarnings(createDataFrame(t1)) - m <- spark.naiveBayes(df, Survived ~ ., smoothing = 0.0) - s <- summary(m) - expect_equal(as.double(s$apriori[1, "Yes"]), 0.5833333, tolerance = 1e-6) - expect_equal(sum(s$apriori), 1) - expect_equal(as.double(s$tables["Yes", "Age_Adult"]), 0.5714286, tolerance = 1e-6) - p <- collect(select(predict(m, df), "prediction")) - expect_equal(p$prediction, c("Yes", "Yes", "Yes", "Yes", "No", "No", "Yes", "Yes", "No", "No", - "Yes", "Yes", "Yes", "Yes", "Yes", "Yes", "Yes", "Yes", "No", "No", - "Yes", "Yes", "No", "No")) - - # Test model save/load - modelPath <- tempfile(pattern = "spark-naiveBayes", fileext = ".tmp") - write.ml(m, modelPath) - expect_error(write.ml(m, modelPath)) - write.ml(m, modelPath, overwrite = TRUE) - m2 <- read.ml(modelPath) - s2 <- summary(m2) - expect_equal(s$apriori, s2$apriori) - expect_equal(s$tables, s2$tables) - - unlink(modelPath) - - # Test e1071::naiveBayes - if (requireNamespace("e1071", quietly = TRUE)) { - expect_error(m <- e1071::naiveBayes(Survived ~ ., data = t1), NA) - expect_equal(as.character(predict(m, t1[1, ])), "Yes") - } - - # Test numeric response variable - t1$NumericSurvived <- ifelse(t1$Survived == "No", 0, 1) - t2 <- t1[-4] - df <- suppressWarnings(createDataFrame(t2)) - m <- spark.naiveBayes(df, NumericSurvived ~ ., smoothing = 0.0) - s <- summary(m) - expect_equal(as.double(s$apriori[1, 1]), 0.5833333, tolerance = 1e-6) - expect_equal(sum(s$apriori), 1) - expect_equal(as.double(s$tables[1, "Age_Adult"]), 0.5714286, tolerance = 1e-6) -}) - -test_that("spark.survreg", { - # R code to reproduce the result. - # - #' rData <- list(time = c(4, 3, 1, 1, 2, 2, 3), status = c(1, 1, 1, 0, 1, 1, 0), - #' x = c(0, 2, 1, 1, 1, 0, 0), sex = c(0, 0, 0, 0, 1, 1, 1)) - #' library(survival) - #' model <- survreg(Surv(time, status) ~ x + sex, rData) - #' summary(model) - #' predict(model, data) - # - # -- output of 'summary(model)' - # - # Value Std. Error z p - # (Intercept) 1.315 0.270 4.88 1.07e-06 - # x -0.190 0.173 -1.10 2.72e-01 - # sex -0.253 0.329 -0.77 4.42e-01 - # Log(scale) -1.160 0.396 -2.93 3.41e-03 - # - # -- output of 'predict(model, data)' - # - # 1 2 3 4 5 6 7 - # 3.724591 2.545368 3.079035 3.079035 2.390146 2.891269 2.891269 - # - data <- list(list(4, 1, 0, 0), list(3, 1, 2, 0), list(1, 1, 1, 0), - list(1, 0, 1, 0), list(2, 1, 1, 1), list(2, 1, 0, 1), list(3, 0, 0, 1)) - df <- createDataFrame(data, c("time", "status", "x", "sex")) - model <- spark.survreg(df, Surv(time, status) ~ x + sex) - stats <- summary(model) - coefs <- as.vector(stats$coefficients[, 1]) - rCoefs <- c(1.3149571, -0.1903409, -0.2532618, -1.1599800) - expect_equal(coefs, rCoefs, tolerance = 1e-4) - expect_true(all( - rownames(stats$coefficients) == - c("(Intercept)", "x", "sex", "Log(scale)"))) - p <- collect(select(predict(model, df), "prediction")) - expect_equal(p$prediction, c(3.724591, 2.545368, 3.079035, 3.079035, - 2.390146, 2.891269, 2.891269), tolerance = 1e-4) - - # Test model save/load - modelPath <- tempfile(pattern = "spark-survreg", fileext = ".tmp") - write.ml(model, modelPath) - expect_error(write.ml(model, modelPath)) - write.ml(model, modelPath, overwrite = TRUE) - model2 <- read.ml(modelPath) - stats2 <- summary(model2) - coefs2 <- as.vector(stats2$coefficients[, 1]) - expect_equal(coefs, coefs2) - expect_equal(rownames(stats$coefficients), rownames(stats2$coefficients)) - - unlink(modelPath) - - # Test survival::survreg - if (requireNamespace("survival", quietly = TRUE)) { - rData <- list(time = c(4, 3, 1, 1, 2, 2, 3), status = c(1, 1, 1, 0, 1, 1, 0), - x = c(0, 2, 1, 1, 1, 0, 0), sex = c(0, 0, 0, 0, 1, 1, 1)) - expect_error( - model <- survival::survreg(formula = survival::Surv(time, status) ~ x + sex, data = rData), - NA) - expect_equal(predict(model, rData)[[1]], 3.724591, tolerance = 1e-4) - } -}) - -test_that("spark.isotonicRegression", { - label <- c(7.0, 5.0, 3.0, 5.0, 1.0) - feature <- c(0.0, 1.0, 2.0, 3.0, 4.0) - weight <- c(1.0, 1.0, 1.0, 1.0, 1.0) - data <- as.data.frame(cbind(label, feature, weight)) - df <- suppressWarnings(createDataFrame(data)) - - model <- spark.isoreg(df, label ~ feature, isotonic = FALSE, - weightCol = "weight") - # only allow one variable on the right hand side of the formula - expect_error(model2 <- spark.isoreg(df, ~., isotonic = FALSE)) - result <- summary(model) - expect_equal(result$predictions, list(7, 5, 4, 4, 1)) - - # Test model prediction - predict_data <- list(list(-2.0), list(-1.0), list(0.5), - list(0.75), list(1.0), list(2.0), list(9.0)) - predict_df <- createDataFrame(predict_data, c("feature")) - predict_result <- collect(select(predict(model, predict_df), "prediction")) - expect_equal(predict_result$prediction, c(7.0, 7.0, 6.0, 5.5, 5.0, 4.0, 1.0)) - - # Test model save/load - modelPath <- tempfile(pattern = "spark-isotonicRegression", fileext = ".tmp") - write.ml(model, modelPath) - expect_error(write.ml(model, modelPath)) - write.ml(model, modelPath, overwrite = TRUE) - model2 <- read.ml(modelPath) - expect_equal(result, summary(model2)) - - unlink(modelPath) -}) - -test_that("spark.logit", { - # R code to reproduce the result. - # nolint start - #' library(glmnet) - #' iris.x = as.matrix(iris[, 1:4]) - #' iris.y = as.factor(as.character(iris[, 5])) - #' logit = glmnet(iris.x, iris.y, family="multinomial", alpha=0, lambda=0.5) - #' coef(logit) - # - # $setosa - # 5 x 1 sparse Matrix of class "dgCMatrix" - # s0 - # 1.0981324 - # Sepal.Length -0.2909860 - # Sepal.Width 0.5510907 - # Petal.Length -0.1915217 - # Petal.Width -0.4211946 - # - # $versicolor - # 5 x 1 sparse Matrix of class "dgCMatrix" - # s0 - # 1.520061e+00 - # Sepal.Length 2.524501e-02 - # Sepal.Width -5.310313e-01 - # Petal.Length 3.656543e-02 - # Petal.Width -3.144464e-05 - # - # $virginica - # 5 x 1 sparse Matrix of class "dgCMatrix" - # s0 - # -2.61819385 - # Sepal.Length 0.26574097 - # Sepal.Width -0.02005932 - # Petal.Length 0.15495629 - # Petal.Width 0.42122607 - # nolint end - - # Test multinomial logistic regression againt three classes - df <- suppressWarnings(createDataFrame(iris)) - model <- spark.logit(df, Species ~ ., regParam = 0.5) - summary <- summary(model) - versicolorCoefsR <- c(1.52, 0.03, -0.53, 0.04, 0.00) - virginicaCoefsR <- c(-2.62, 0.27, -0.02, 0.16, 0.42) - setosaCoefsR <- c(1.10, -0.29, 0.55, -0.19, -0.42) - versicolorCoefs <- unlist(summary$coefficients[, "versicolor"]) - virginicaCoefs <- unlist(summary$coefficients[, "virginica"]) - setosaCoefs <- unlist(summary$coefficients[, "setosa"]) - expect_true(all(abs(versicolorCoefsR - versicolorCoefs) < 0.1)) - expect_true(all(abs(virginicaCoefsR - virginicaCoefs) < 0.1)) - expect_true(all(abs(setosaCoefs - setosaCoefs) < 0.1)) - - # Test model save and load - modelPath <- tempfile(pattern = "spark-logit", fileext = ".tmp") - write.ml(model, modelPath) - expect_error(write.ml(model, modelPath)) - write.ml(model, modelPath, overwrite = TRUE) - model2 <- read.ml(modelPath) - coefs <- summary(model)$coefficients - coefs2 <- summary(model2)$coefficients - expect_equal(coefs, coefs2) - unlink(modelPath) - - # R code to reproduce the result. - # nolint start - #' library(glmnet) - #' iris2 <- iris[iris$Species %in% c("versicolor", "virginica"), ] - #' iris.x = as.matrix(iris2[, 1:4]) - #' iris.y = as.factor(as.character(iris2[, 5])) - #' logit = glmnet(iris.x, iris.y, family="multinomial", alpha=0, lambda=0.5) - #' coef(logit) - # - # $versicolor - # 5 x 1 sparse Matrix of class "dgCMatrix" - # s0 - # 3.93844796 - # Sepal.Length -0.13538675 - # Sepal.Width -0.02386443 - # Petal.Length -0.35076451 - # Petal.Width -0.77971954 - # - # $virginica - # 5 x 1 sparse Matrix of class "dgCMatrix" - # s0 - # -3.93844796 - # Sepal.Length 0.13538675 - # Sepal.Width 0.02386443 - # Petal.Length 0.35076451 - # Petal.Width 0.77971954 - # - #' logit = glmnet(iris.x, iris.y, family="binomial", alpha=0, lambda=0.5) - #' coef(logit) - # - # 5 x 1 sparse Matrix of class "dgCMatrix" - # s0 - # (Intercept) -6.0824412 - # Sepal.Length 0.2458260 - # Sepal.Width 0.1642093 - # Petal.Length 0.4759487 - # Petal.Width 1.0383948 - # - # nolint end - - # Test multinomial logistic regression againt two classes - df <- suppressWarnings(createDataFrame(iris)) - training <- df[df$Species %in% c("versicolor", "virginica"), ] - model <- spark.logit(training, Species ~ ., regParam = 0.5, family = "multinomial") - summary <- summary(model) - versicolorCoefsR <- c(3.94, -0.16, -0.02, -0.35, -0.78) - virginicaCoefsR <- c(-3.94, 0.16, -0.02, 0.35, 0.78) - versicolorCoefs <- unlist(summary$coefficients[, "versicolor"]) - virginicaCoefs <- unlist(summary$coefficients[, "virginica"]) - expect_true(all(abs(versicolorCoefsR - versicolorCoefs) < 0.1)) - expect_true(all(abs(virginicaCoefsR - virginicaCoefs) < 0.1)) - - # Test binomial logistic regression againt two classes - model <- spark.logit(training, Species ~ ., regParam = 0.5) - summary <- summary(model) - coefsR <- c(-6.08, 0.25, 0.16, 0.48, 1.04) - coefs <- unlist(summary$coefficients[, "Estimate"]) - expect_true(all(abs(coefsR - coefs) < 0.1)) - - # Test prediction with string label - prediction <- predict(model, training) - expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "character") - expected <- c("versicolor", "versicolor", "virginica", "versicolor", "versicolor", - "versicolor", "versicolor", "versicolor", "versicolor", "versicolor") - expect_equal(as.list(take(select(prediction, "prediction"), 10))[[1]], expected) - - # Test prediction with numeric label - label <- c(0.0, 0.0, 0.0, 1.0, 1.0) - feature <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776) - data <- as.data.frame(cbind(label, feature)) - df <- createDataFrame(data) - model <- spark.logit(df, label ~ feature) - prediction <- collect(select(predict(model, df), "prediction")) - expect_equal(prediction$prediction, c("0.0", "0.0", "1.0", "1.0", "0.0")) -}) - -test_that("spark.gaussianMixture", { - # R code to reproduce the result. - # nolint start - #' library(mvtnorm) - #' set.seed(1) - #' a <- rmvnorm(7, c(0, 0)) - #' b <- rmvnorm(8, c(10, 10)) - #' data <- rbind(a, b) - #' model <- mvnormalmixEM(data, k = 2) - #' model$lambda - # - # [1] 0.4666667 0.5333333 - # - #' model$mu - # - # [1] 0.11731091 -0.06192351 - # [1] 10.363673 9.897081 - # - #' model$sigma - # - # [[1]] - # [,1] [,2] - # [1,] 0.62049934 0.06880802 - # [2,] 0.06880802 1.27431874 - # - # [[2]] - # [,1] [,2] - # [1,] 0.2961543 0.160783 - # [2,] 0.1607830 1.008878 - # nolint end - data <- list(list(-0.6264538, 0.1836433), list(-0.8356286, 1.5952808), - list(0.3295078, -0.8204684), list(0.4874291, 0.7383247), - list(0.5757814, -0.3053884), list(1.5117812, 0.3898432), - list(-0.6212406, -2.2146999), list(11.1249309, 9.9550664), - list(9.9838097, 10.9438362), list(10.8212212, 10.5939013), - list(10.9189774, 10.7821363), list(10.0745650, 8.0106483), - list(10.6198257, 9.9438713), list(9.8442045, 8.5292476), - list(9.5218499, 10.4179416)) - df <- createDataFrame(data, c("x1", "x2")) - model <- spark.gaussianMixture(df, ~ x1 + x2, k = 2) - stats <- summary(model) - rLambda <- c(0.4666667, 0.5333333) - rMu <- c(0.11731091, -0.06192351, 10.363673, 9.897081) - rSigma <- c(0.62049934, 0.06880802, 0.06880802, 1.27431874, - 0.2961543, 0.160783, 0.1607830, 1.008878) - expect_equal(stats$lambda, rLambda, tolerance = 1e-3) - expect_equal(unlist(stats$mu), rMu, tolerance = 1e-3) - expect_equal(unlist(stats$sigma), rSigma, tolerance = 1e-3) - p <- collect(select(predict(model, df), "prediction")) - expect_equal(p$prediction, c(0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1)) - - # Test model save/load - modelPath <- tempfile(pattern = "spark-gaussianMixture", fileext = ".tmp") - write.ml(model, modelPath) - expect_error(write.ml(model, modelPath)) - write.ml(model, modelPath, overwrite = TRUE) - model2 <- read.ml(modelPath) - stats2 <- summary(model2) - expect_equal(stats$lambda, stats2$lambda) - expect_equal(unlist(stats$mu), unlist(stats2$mu)) - expect_equal(unlist(stats$sigma), unlist(stats2$sigma)) - - unlink(modelPath) -}) - -test_that("spark.lda with libsvm", { - text <- read.df(absoluteSparkPath("data/mllib/sample_lda_libsvm_data.txt"), source = "libsvm") - model <- spark.lda(text, optimizer = "em") - - stats <- summary(model, 10) - isDistributed <- stats$isDistributed - logLikelihood <- stats$logLikelihood - logPerplexity <- stats$logPerplexity - vocabSize <- stats$vocabSize - topics <- stats$topicTopTerms - weights <- stats$topicTopTermsWeights - vocabulary <- stats$vocabulary - - expect_true(isDistributed) - expect_true(logLikelihood <= 0 & is.finite(logLikelihood)) - expect_true(logPerplexity >= 0 & is.finite(logPerplexity)) - expect_equal(vocabSize, 11) - expect_true(is.null(vocabulary)) - - # Test model save/load - modelPath <- tempfile(pattern = "spark-lda", fileext = ".tmp") - write.ml(model, modelPath) - expect_error(write.ml(model, modelPath)) - write.ml(model, modelPath, overwrite = TRUE) - model2 <- read.ml(modelPath) - stats2 <- summary(model2) - - expect_true(stats2$isDistributed) - expect_equal(logLikelihood, stats2$logLikelihood) - expect_equal(logPerplexity, stats2$logPerplexity) - expect_equal(vocabSize, stats2$vocabSize) - expect_equal(vocabulary, stats2$vocabulary) - - unlink(modelPath) -}) - -test_that("spark.lda with text input", { - text <- read.text(absoluteSparkPath("data/mllib/sample_lda_data.txt")) - model <- spark.lda(text, optimizer = "online", features = "value") - - stats <- summary(model) - isDistributed <- stats$isDistributed - logLikelihood <- stats$logLikelihood - logPerplexity <- stats$logPerplexity - vocabSize <- stats$vocabSize - topics <- stats$topicTopTerms - weights <- stats$topicTopTermsWeights - vocabulary <- stats$vocabulary - - expect_false(isDistributed) - expect_true(logLikelihood <= 0 & is.finite(logLikelihood)) - expect_true(logPerplexity >= 0 & is.finite(logPerplexity)) - expect_equal(vocabSize, 10) - expect_true(setequal(stats$vocabulary, c("0", "1", "2", "3", "4", "5", "6", "7", "8", "9"))) - - # Test model save/load - modelPath <- tempfile(pattern = "spark-lda-text", fileext = ".tmp") - write.ml(model, modelPath) - expect_error(write.ml(model, modelPath)) - write.ml(model, modelPath, overwrite = TRUE) - model2 <- read.ml(modelPath) - stats2 <- summary(model2) - - expect_false(stats2$isDistributed) - expect_equal(logLikelihood, stats2$logLikelihood) - expect_equal(logPerplexity, stats2$logPerplexity) - expect_equal(vocabSize, stats2$vocabSize) - expect_true(all.equal(vocabulary, stats2$vocabulary)) - - unlink(modelPath) -}) - -test_that("spark.posterior and spark.perplexity", { - text <- read.text(absoluteSparkPath("data/mllib/sample_lda_data.txt")) - model <- spark.lda(text, features = "value", k = 3) - - # Assert perplexities are equal - stats <- summary(model) - logPerplexity <- spark.perplexity(model, text) - expect_equal(logPerplexity, stats$logPerplexity) - - # Assert the sum of every topic distribution is equal to 1 - posterior <- spark.posterior(model, text) - local.posterior <- collect(posterior)$topicDistribution - expect_equal(length(local.posterior), sum(unlist(local.posterior))) -}) - -test_that("spark.als", { - data <- list(list(0, 0, 4.0), list(0, 1, 2.0), list(1, 1, 3.0), list(1, 2, 4.0), - list(2, 1, 1.0), list(2, 2, 5.0)) - df <- createDataFrame(data, c("user", "item", "score")) - model <- spark.als(df, ratingCol = "score", userCol = "user", itemCol = "item", - rank = 10, maxIter = 5, seed = 0, regParam = 0.1) - stats <- summary(model) - expect_equal(stats$rank, 10) - test <- createDataFrame(list(list(0, 2), list(1, 0), list(2, 0)), c("user", "item")) - predictions <- collect(predict(model, test)) - - expect_equal(predictions$prediction, c(-0.1380762, 2.6258414, -1.5018409), - tolerance = 1e-4) - - # Test model save/load - modelPath <- tempfile(pattern = "spark-als", fileext = ".tmp") - write.ml(model, modelPath) - expect_error(write.ml(model, modelPath)) - write.ml(model, modelPath, overwrite = TRUE) - model2 <- read.ml(modelPath) - stats2 <- summary(model2) - expect_equal(stats2$rating, "score") - userFactors <- collect(stats$userFactors) - itemFactors <- collect(stats$itemFactors) - userFactors2 <- collect(stats2$userFactors) - itemFactors2 <- collect(stats2$itemFactors) - - orderUser <- order(userFactors$id) - orderUser2 <- order(userFactors2$id) - expect_equal(userFactors$id[orderUser], userFactors2$id[orderUser2]) - expect_equal(userFactors$features[orderUser], userFactors2$features[orderUser2]) - - orderItem <- order(itemFactors$id) - orderItem2 <- order(itemFactors2$id) - expect_equal(itemFactors$id[orderItem], itemFactors2$id[orderItem2]) - expect_equal(itemFactors$features[orderItem], itemFactors2$features[orderItem2]) - - unlink(modelPath) -}) - -test_that("spark.kstest", { - data <- data.frame(test = c(0.1, 0.15, 0.2, 0.3, 0.25, -1, -0.5)) - df <- createDataFrame(data) - testResult <- spark.kstest(df, "test", "norm") - stats <- summary(testResult) - - rStats <- ks.test(data$test, "pnorm", alternative = "two.sided") - - expect_equal(stats$p.value, rStats$p.value, tolerance = 1e-4) - expect_equal(stats$statistic, unname(rStats$statistic), tolerance = 1e-4) - expect_match(capture.output(stats)[1], "Kolmogorov-Smirnov test summary:") - - testResult <- spark.kstest(df, "test", "norm", -0.5) - stats <- summary(testResult) - - rStats <- ks.test(data$test, "pnorm", -0.5, 1, alternative = "two.sided") - - expect_equal(stats$p.value, rStats$p.value, tolerance = 1e-4) - expect_equal(stats$statistic, unname(rStats$statistic), tolerance = 1e-4) - expect_match(capture.output(stats)[1], "Kolmogorov-Smirnov test summary:") - - # Test print.summary.KSTest - printStats <- capture.output(print.summary.KSTest(stats)) - expect_match(printStats[1], "Kolmogorov-Smirnov test summary:") - expect_match(printStats[5], - "Low presumption against null hypothesis: Sample follows theoretical distribution. ") -}) - -test_that("spark.randomForest", { - # regression - data <- suppressWarnings(createDataFrame(longley)) - model <- spark.randomForest(data, Employed ~ ., "regression", maxDepth = 5, maxBins = 16, - numTrees = 1) - - predictions <- collect(predict(model, data)) - expect_equal(predictions$prediction, c(60.323, 61.122, 60.171, 61.187, - 63.221, 63.639, 64.989, 63.761, - 66.019, 67.857, 68.169, 66.513, - 68.655, 69.564, 69.331, 70.551), - tolerance = 1e-4) - - stats <- summary(model) - expect_equal(stats$numTrees, 1) - expect_error(capture.output(stats), NA) - expect_true(length(capture.output(stats)) > 6) - - model <- spark.randomForest(data, Employed ~ ., "regression", maxDepth = 5, maxBins = 16, - numTrees = 20, seed = 123) - predictions <- collect(predict(model, data)) - expect_equal(predictions$prediction, c(60.32820, 61.22315, 60.69025, 62.11070, - 63.53160, 64.05470, 65.12710, 64.30450, - 66.70910, 67.86125, 68.08700, 67.21865, - 68.89275, 69.53180, 69.39640, 69.68250), - - tolerance = 1e-4) - stats <- summary(model) - expect_equal(stats$numTrees, 20) - - modelPath <- tempfile(pattern = "spark-randomForestRegression", fileext = ".tmp") - write.ml(model, modelPath) - expect_error(write.ml(model, modelPath)) - write.ml(model, modelPath, overwrite = TRUE) - model2 <- read.ml(modelPath) - stats2 <- summary(model2) - expect_equal(stats$formula, stats2$formula) - expect_equal(stats$numFeatures, stats2$numFeatures) - expect_equal(stats$features, stats2$features) - expect_equal(stats$featureImportances, stats2$featureImportances) - expect_equal(stats$numTrees, stats2$numTrees) - expect_equal(stats$treeWeights, stats2$treeWeights) - - unlink(modelPath) - - # classification - data <- suppressWarnings(createDataFrame(iris)) - model <- spark.randomForest(data, Species ~ Petal_Length + Petal_Width, "classification", - maxDepth = 5, maxBins = 16) - - stats <- summary(model) - expect_equal(stats$numFeatures, 2) - expect_equal(stats$numTrees, 20) - expect_error(capture.output(stats), NA) - expect_true(length(capture.output(stats)) > 6) - # Test string prediction values - predictions <- collect(predict(model, data))$prediction - expect_equal(length(grep("setosa", predictions)), 50) - expect_equal(length(grep("versicolor", predictions)), 50) - - modelPath <- tempfile(pattern = "spark-randomForestClassification", fileext = ".tmp") - write.ml(model, modelPath) - expect_error(write.ml(model, modelPath)) - write.ml(model, modelPath, overwrite = TRUE) - model2 <- read.ml(modelPath) - stats2 <- summary(model2) - expect_equal(stats$depth, stats2$depth) - expect_equal(stats$numNodes, stats2$numNodes) - expect_equal(stats$numClasses, stats2$numClasses) - - unlink(modelPath) - - # Test numeric response variable - labelToIndex <- function(species) { - switch(as.character(species), - setosa = 0.0, - versicolor = 1.0, - virginica = 2.0 - ) - } - iris$NumericSpecies <- lapply(iris$Species, labelToIndex) - data <- suppressWarnings(createDataFrame(iris[-5])) - model <- spark.randomForest(data, NumericSpecies ~ Petal_Length + Petal_Width, "classification", - maxDepth = 5, maxBins = 16) - stats <- summary(model) - expect_equal(stats$numFeatures, 2) - expect_equal(stats$numTrees, 20) - # Test numeric prediction values - predictions <- collect(predict(model, data))$prediction - expect_equal(length(grep("1.0", predictions)), 50) - expect_equal(length(grep("2.0", predictions)), 50) - - # spark.randomForest classification can work on libsvm data - data <- read.df(absoluteSparkPath("data/mllib/sample_multiclass_classification_data.txt"), - source = "libsvm") - model <- spark.randomForest(data, label ~ features, "classification") - expect_equal(summary(model)$numFeatures, 4) -}) - -test_that("spark.gbt", { - # regression - data <- suppressWarnings(createDataFrame(longley)) - model <- spark.gbt(data, Employed ~ ., "regression", maxDepth = 5, maxBins = 16, seed = 123) - predictions <- collect(predict(model, data)) - expect_equal(predictions$prediction, c(60.323, 61.122, 60.171, 61.187, - 63.221, 63.639, 64.989, 63.761, - 66.019, 67.857, 68.169, 66.513, - 68.655, 69.564, 69.331, 70.551), - tolerance = 1e-4) - stats <- summary(model) - expect_equal(stats$numTrees, 20) - expect_equal(stats$formula, "Employed ~ .") - expect_equal(stats$numFeatures, 6) - expect_equal(length(stats$treeWeights), 20) - - modelPath <- tempfile(pattern = "spark-gbtRegression", fileext = ".tmp") - write.ml(model, modelPath) - expect_error(write.ml(model, modelPath)) - write.ml(model, modelPath, overwrite = TRUE) - model2 <- read.ml(modelPath) - stats2 <- summary(model2) - expect_equal(stats$formula, stats2$formula) - expect_equal(stats$numFeatures, stats2$numFeatures) - expect_equal(stats$features, stats2$features) - expect_equal(stats$featureImportances, stats2$featureImportances) - expect_equal(stats$numTrees, stats2$numTrees) - expect_equal(stats$treeWeights, stats2$treeWeights) - - unlink(modelPath) - - # classification - # label must be binary - GBTClassifier currently only supports binary classification. - iris2 <- iris[iris$Species != "virginica", ] - data <- suppressWarnings(createDataFrame(iris2)) - model <- spark.gbt(data, Species ~ Petal_Length + Petal_Width, "classification") - stats <- summary(model) - expect_equal(stats$numFeatures, 2) - expect_equal(stats$numTrees, 20) - expect_error(capture.output(stats), NA) - expect_true(length(capture.output(stats)) > 6) - predictions <- collect(predict(model, data))$prediction - # test string prediction values - expect_equal(length(grep("setosa", predictions)), 50) - expect_equal(length(grep("versicolor", predictions)), 50) - - modelPath <- tempfile(pattern = "spark-gbtClassification", fileext = ".tmp") - write.ml(model, modelPath) - expect_error(write.ml(model, modelPath)) - write.ml(model, modelPath, overwrite = TRUE) - model2 <- read.ml(modelPath) - stats2 <- summary(model2) - expect_equal(stats$depth, stats2$depth) - expect_equal(stats$numNodes, stats2$numNodes) - expect_equal(stats$numClasses, stats2$numClasses) - - unlink(modelPath) - - iris2$NumericSpecies <- ifelse(iris2$Species == "setosa", 0, 1) - df <- suppressWarnings(createDataFrame(iris2)) - m <- spark.gbt(df, NumericSpecies ~ ., type = "classification") - s <- summary(m) - # test numeric prediction values - expect_equal(iris2$NumericSpecies, as.double(collect(predict(m, df))$prediction)) - expect_equal(s$numFeatures, 5) - expect_equal(s$numTrees, 20) - - # spark.gbt classification can work on libsvm data - data <- read.df(absoluteSparkPath("data/mllib/sample_binary_classification_data.txt"), - source = "libsvm") - model <- spark.gbt(data, label ~ features, "classification") - expect_equal(summary(model)$numFeatures, 692) -}) - -sparkR.session.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/ae4e8ae4/R/pkg/inst/tests/testthat/test_parallelize_collect.R ---------------------------------------------------------------------- diff --git a/R/pkg/inst/tests/testthat/test_parallelize_collect.R b/R/pkg/inst/tests/testthat/test_parallelize_collect.R deleted file mode 100644 index 55972e1..0000000 --- a/R/pkg/inst/tests/testthat/test_parallelize_collect.R +++ /dev/null @@ -1,112 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -context("parallelize() and collect()") - -# Mock data -numVector <- c(-10:97) -numList <- list(sqrt(1), sqrt(2), sqrt(3), 4 ** 10) -strVector <- c("Dexter Morgan: I suppose I should be upset, even feel", - "violated, but I'm not. No, in fact, I think this is a friendly", - "message, like \"Hey, wanna play?\" and yes, I want to play. ", - "I really, really do.") -strList <- list("Dexter Morgan: Blood. Sometimes it sets my teeth on edge, ", - "other times it helps me control the chaos.", - "Dexter Morgan: Harry and Dorris Morgan did a wonderful job ", - "raising me. But they're both dead now. I didn't kill them. Honest.") - -numPairs <- list(list(1, 1), list(1, 2), list(2, 2), list(2, 3)) -strPairs <- list(list(strList, strList), list(strList, strList)) - -# JavaSparkContext handle -sparkSession <- sparkR.session(enableHiveSupport = FALSE) -jsc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) - -# Tests - -test_that("parallelize() on simple vectors and lists returns an RDD", { - numVectorRDD <- parallelize(jsc, numVector, 1) - numVectorRDD2 <- parallelize(jsc, numVector, 10) - numListRDD <- parallelize(jsc, numList, 1) - numListRDD2 <- parallelize(jsc, numList, 4) - strVectorRDD <- parallelize(jsc, strVector, 2) - strVectorRDD2 <- parallelize(jsc, strVector, 3) - strListRDD <- parallelize(jsc, strList, 4) - strListRDD2 <- parallelize(jsc, strList, 1) - - rdds <- c(numVectorRDD, - numVectorRDD2, - numListRDD, - numListRDD2, - strVectorRDD, - strVectorRDD2, - strListRDD, - strListRDD2) - - for (rdd in rdds) { - expect_is(rdd, "RDD") - expect_true(.hasSlot(rdd, "jrdd") - && inherits(rdd@jrdd, "jobj") - && isInstanceOf(rdd@jrdd, "org.apache.spark.api.java.JavaRDD")) - } -}) - -test_that("collect(), following a parallelize(), gives back the original collections", { - numVectorRDD <- parallelize(jsc, numVector, 10) - expect_equal(collectRDD(numVectorRDD), as.list(numVector)) - - numListRDD <- parallelize(jsc, numList, 1) - numListRDD2 <- parallelize(jsc, numList, 4) - expect_equal(collectRDD(numListRDD), as.list(numList)) - expect_equal(collectRDD(numListRDD2), as.list(numList)) - - strVectorRDD <- parallelize(jsc, strVector, 2) - strVectorRDD2 <- parallelize(jsc, strVector, 3) - expect_equal(collectRDD(strVectorRDD), as.list(strVector)) - expect_equal(collectRDD(strVectorRDD2), as.list(strVector)) - - strListRDD <- parallelize(jsc, strList, 4) - strListRDD2 <- parallelize(jsc, strList, 1) - expect_equal(collectRDD(strListRDD), as.list(strList)) - expect_equal(collectRDD(strListRDD2), as.list(strList)) -}) - -test_that("regression: collect() following a parallelize() does not drop elements", { - # 10 %/% 6 = 1, ceiling(10 / 6) = 2 - collLen <- 10 - numPart <- 6 - expected <- runif(collLen) - actual <- collectRDD(parallelize(jsc, expected, numPart)) - expect_equal(actual, as.list(expected)) -}) - -test_that("parallelize() and collect() work for lists of pairs (pairwise data)", { - # use the pairwise logical to indicate pairwise data - numPairsRDDD1 <- parallelize(jsc, numPairs, 1) - numPairsRDDD2 <- parallelize(jsc, numPairs, 2) - numPairsRDDD3 <- parallelize(jsc, numPairs, 3) - expect_equal(collectRDD(numPairsRDDD1), numPairs) - expect_equal(collectRDD(numPairsRDDD2), numPairs) - expect_equal(collectRDD(numPairsRDDD3), numPairs) - # can also leave out the parameter name, if the params are supplied in order - strPairsRDDD1 <- parallelize(jsc, strPairs, 1) - strPairsRDDD2 <- parallelize(jsc, strPairs, 2) - expect_equal(collectRDD(strPairsRDDD1), strPairs) - expect_equal(collectRDD(strPairsRDDD2), strPairs) -}) - -sparkR.session.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/ae4e8ae4/R/pkg/inst/tests/testthat/test_rdd.R ---------------------------------------------------------------------- diff --git a/R/pkg/inst/tests/testthat/test_rdd.R b/R/pkg/inst/tests/testthat/test_rdd.R deleted file mode 100644 index 787ef51..0000000 --- a/R/pkg/inst/tests/testthat/test_rdd.R +++ /dev/null @@ -1,804 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -context("basic RDD functions") - -# JavaSparkContext handle -sparkSession <- sparkR.session(enableHiveSupport = FALSE) -sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) - -# Data -nums <- 1:10 -rdd <- parallelize(sc, nums, 2L) - -intPairs <- list(list(1L, -1), list(2L, 100), list(2L, 1), list(1L, 200)) -intRdd <- parallelize(sc, intPairs, 2L) - -test_that("get number of partitions in RDD", { - expect_equal(getNumPartitionsRDD(rdd), 2) - expect_equal(getNumPartitionsRDD(intRdd), 2) -}) - -test_that("first on RDD", { - expect_equal(firstRDD(rdd), 1) - newrdd <- lapply(rdd, function(x) x + 1) - expect_equal(firstRDD(newrdd), 2) -}) - -test_that("count and length on RDD", { - expect_equal(countRDD(rdd), 10) - expect_equal(lengthRDD(rdd), 10) -}) - -test_that("count by values and keys", { - mods <- lapply(rdd, function(x) { x %% 3 }) - actual <- countByValue(mods) - expected <- list(list(0, 3L), list(1, 4L), list(2, 3L)) - expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) - - actual <- countByKey(intRdd) - expected <- list(list(2L, 2L), list(1L, 2L)) - expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) -}) - -test_that("lapply on RDD", { - multiples <- lapply(rdd, function(x) { 2 * x }) - actual <- collectRDD(multiples) - expect_equal(actual, as.list(nums * 2)) -}) - -test_that("lapplyPartition on RDD", { - sums <- lapplyPartition(rdd, function(part) { sum(unlist(part)) }) - actual <- collectRDD(sums) - expect_equal(actual, list(15, 40)) -}) - -test_that("mapPartitions on RDD", { - sums <- mapPartitions(rdd, function(part) { sum(unlist(part)) }) - actual <- collectRDD(sums) - expect_equal(actual, list(15, 40)) -}) - -test_that("flatMap() on RDDs", { - flat <- flatMap(intRdd, function(x) { list(x, x) }) - actual <- collectRDD(flat) - expect_equal(actual, rep(intPairs, each = 2)) -}) - -test_that("filterRDD on RDD", { - filtered.rdd <- filterRDD(rdd, function(x) { x %% 2 == 0 }) - actual <- collectRDD(filtered.rdd) - expect_equal(actual, list(2, 4, 6, 8, 10)) - - filtered.rdd <- Filter(function(x) { x[[2]] < 0 }, intRdd) - actual <- collectRDD(filtered.rdd) - expect_equal(actual, list(list(1L, -1))) - - # Filter out all elements. - filtered.rdd <- filterRDD(rdd, function(x) { x > 10 }) - actual <- collectRDD(filtered.rdd) - expect_equal(actual, list()) -}) - -test_that("lookup on RDD", { - vals <- lookup(intRdd, 1L) - expect_equal(vals, list(-1, 200)) - - vals <- lookup(intRdd, 3L) - expect_equal(vals, list()) -}) - -test_that("several transformations on RDD (a benchmark on PipelinedRDD)", { - rdd2 <- rdd - for (i in 1:12) - rdd2 <- lapplyPartitionsWithIndex( - rdd2, function(partIndex, part) { - part <- as.list(unlist(part) * partIndex + i) - }) - rdd2 <- lapply(rdd2, function(x) x + x) - actual <- collectRDD(rdd2) - expected <- list(24, 24, 24, 24, 24, - 168, 170, 172, 174, 176) - expect_equal(actual, expected) -}) - -test_that("PipelinedRDD support actions: cache(), persist(), unpersist(), checkpoint()", { - # RDD - rdd2 <- rdd - # PipelinedRDD - rdd2 <- lapplyPartitionsWithIndex( - rdd2, - function(partIndex, part) { - part <- as.list(unlist(part) * partIndex) - }) - - cacheRDD(rdd2) - expect_true(rdd2@env$isCached) - rdd2 <- lapply(rdd2, function(x) x) - expect_false(rdd2@env$isCached) - - unpersistRDD(rdd2) - expect_false(rdd2@env$isCached) - - persistRDD(rdd2, "MEMORY_AND_DISK") - expect_true(rdd2@env$isCached) - rdd2 <- lapply(rdd2, function(x) x) - expect_false(rdd2@env$isCached) - - unpersistRDD(rdd2) - expect_false(rdd2@env$isCached) - - tempDir <- tempfile(pattern = "checkpoint") - setCheckpointDir(sc, tempDir) - checkpoint(rdd2) - expect_true(rdd2@env$isCheckpointed) - - rdd2 <- lapply(rdd2, function(x) x) - expect_false(rdd2@env$isCached) - expect_false(rdd2@env$isCheckpointed) - - # make sure the data is collectable - collectRDD(rdd2) - - unlink(tempDir) -}) - -test_that("reduce on RDD", { - sum <- reduce(rdd, "+") - expect_equal(sum, 55) - - # Also test with an inline function - sumInline <- reduce(rdd, function(x, y) { x + y }) - expect_equal(sumInline, 55) -}) - -test_that("lapply with dependency", { - fa <- 5 - multiples <- lapply(rdd, function(x) { fa * x }) - actual <- collectRDD(multiples) - - expect_equal(actual, as.list(nums * 5)) -}) - -test_that("lapplyPartitionsWithIndex on RDDs", { - func <- function(partIndex, part) { list(partIndex, Reduce("+", part)) } - actual <- collectRDD(lapplyPartitionsWithIndex(rdd, func), flatten = FALSE) - expect_equal(actual, list(list(0, 15), list(1, 40))) - - pairsRDD <- parallelize(sc, list(list(1, 2), list(3, 4), list(4, 8)), 1L) - partitionByParity <- function(key) { if (key %% 2 == 1) 0 else 1 } - mkTup <- function(partIndex, part) { list(partIndex, part) } - actual <- collectRDD(lapplyPartitionsWithIndex( - partitionByRDD(pairsRDD, 2L, partitionByParity), - mkTup), - FALSE) - expect_equal(actual, list(list(0, list(list(1, 2), list(3, 4))), - list(1, list(list(4, 8))))) -}) - -test_that("sampleRDD() on RDDs", { - expect_equal(unlist(collectRDD(sampleRDD(rdd, FALSE, 1.0, 2014L))), nums) -}) - -test_that("takeSample() on RDDs", { - # ported from RDDSuite.scala, modified seeds - data <- parallelize(sc, 1:100, 2L) - for (seed in 4:5) { - s <- takeSample(data, FALSE, 20L, seed) - expect_equal(length(s), 20L) - expect_equal(length(unique(s)), 20L) - for (elem in s) { - expect_true(elem >= 1 && elem <= 100) - } - } - for (seed in 4:5) { - s <- takeSample(data, FALSE, 200L, seed) - expect_equal(length(s), 100L) - expect_equal(length(unique(s)), 100L) - for (elem in s) { - expect_true(elem >= 1 && elem <= 100) - } - } - for (seed in 4:5) { - s <- takeSample(data, TRUE, 20L, seed) - expect_equal(length(s), 20L) - for (elem in s) { - expect_true(elem >= 1 && elem <= 100) - } - } - for (seed in 4:5) { - s <- takeSample(data, TRUE, 100L, seed) - expect_equal(length(s), 100L) - # Chance of getting all distinct elements is astronomically low, so test we - # got less than 100 - expect_true(length(unique(s)) < 100L) - } - for (seed in 4:5) { - s <- takeSample(data, TRUE, 200L, seed) - expect_equal(length(s), 200L) - # Chance of getting all distinct elements is still quite low, so test we - # got less than 100 - expect_true(length(unique(s)) < 100L) - } -}) - -test_that("mapValues() on pairwise RDDs", { - multiples <- mapValues(intRdd, function(x) { x * 2 }) - actual <- collectRDD(multiples) - expected <- lapply(intPairs, function(x) { - list(x[[1]], x[[2]] * 2) - }) - expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) -}) - -test_that("flatMapValues() on pairwise RDDs", { - l <- parallelize(sc, list(list(1, c(1, 2)), list(2, c(3, 4)))) - actual <- collectRDD(flatMapValues(l, function(x) { x })) - expect_equal(actual, list(list(1, 1), list(1, 2), list(2, 3), list(2, 4))) - - # Generate x to x+1 for every value - actual <- collectRDD(flatMapValues(intRdd, function(x) { x: (x + 1) })) - expect_equal(actual, - list(list(1L, -1), list(1L, 0), list(2L, 100), list(2L, 101), - list(2L, 1), list(2L, 2), list(1L, 200), list(1L, 201))) -}) - -test_that("reduceByKeyLocally() on PairwiseRDDs", { - pairs <- parallelize(sc, list(list(1, 2), list(1.1, 3), list(1, 4)), 2L) - actual <- reduceByKeyLocally(pairs, "+") - expect_equal(sortKeyValueList(actual), - sortKeyValueList(list(list(1, 6), list(1.1, 3)))) - - pairs <- parallelize(sc, list(list("abc", 1.2), list(1.1, 0), list("abc", 1.3), - list("bb", 5)), 4L) - actual <- reduceByKeyLocally(pairs, "+") - expect_equal(sortKeyValueList(actual), - sortKeyValueList(list(list("abc", 2.5), list(1.1, 0), list("bb", 5)))) -}) - -test_that("distinct() on RDDs", { - nums.rep2 <- rep(1:10, 2) - rdd.rep2 <- parallelize(sc, nums.rep2, 2L) - uniques <- distinctRDD(rdd.rep2) - actual <- sort(unlist(collectRDD(uniques))) - expect_equal(actual, nums) -}) - -test_that("maximum() on RDDs", { - max <- maximum(rdd) - expect_equal(max, 10) -}) - -test_that("minimum() on RDDs", { - min <- minimum(rdd) - expect_equal(min, 1) -}) - -test_that("sumRDD() on RDDs", { - sum <- sumRDD(rdd) - expect_equal(sum, 55) -}) - -test_that("keyBy on RDDs", { - func <- function(x) { x * x } - keys <- keyBy(rdd, func) - actual <- collectRDD(keys) - expect_equal(actual, lapply(nums, function(x) { list(func(x), x) })) -}) - -test_that("repartition/coalesce on RDDs", { - rdd <- parallelize(sc, 1:20, 4L) # each partition contains 5 elements - - # repartition - r1 <- repartitionRDD(rdd, 2) - expect_equal(getNumPartitionsRDD(r1), 2L) - count <- length(collectPartition(r1, 0L)) - expect_true(count >= 8 && count <= 12) - - r2 <- repartitionRDD(rdd, 6) - expect_equal(getNumPartitionsRDD(r2), 6L) - count <- length(collectPartition(r2, 0L)) - expect_true(count >= 0 && count <= 4) - - # coalesce - r3 <- coalesceRDD(rdd, 1) - expect_equal(getNumPartitionsRDD(r3), 1L) - count <- length(collectPartition(r3, 0L)) - expect_equal(count, 20) -}) - -test_that("sortBy() on RDDs", { - sortedRdd <- sortBy(rdd, function(x) { x * x }, ascending = FALSE) - actual <- collectRDD(sortedRdd) - expect_equal(actual, as.list(sort(nums, decreasing = TRUE))) - - rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L) - sortedRdd2 <- sortBy(rdd2, function(x) { x * x }) - actual <- collectRDD(sortedRdd2) - expect_equal(actual, as.list(nums)) -}) - -test_that("takeOrdered() on RDDs", { - l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7) - rdd <- parallelize(sc, l) - actual <- takeOrdered(rdd, 6L) - expect_equal(actual, as.list(sort(unlist(l)))[1:6]) - - l <- list("e", "d", "c", "d", "a") - rdd <- parallelize(sc, l) - actual <- takeOrdered(rdd, 3L) - expect_equal(actual, as.list(sort(unlist(l)))[1:3]) -}) - -test_that("top() on RDDs", { - l <- list(10, 1, 2, 9, 3, 4, 5, 6, 7) - rdd <- parallelize(sc, l) - actual <- top(rdd, 6L) - expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:6]) - - l <- list("e", "d", "c", "d", "a") - rdd <- parallelize(sc, l) - actual <- top(rdd, 3L) - expect_equal(actual, as.list(sort(unlist(l), decreasing = TRUE))[1:3]) -}) - -test_that("fold() on RDDs", { - actual <- fold(rdd, 0, "+") - expect_equal(actual, Reduce("+", nums, 0)) - - rdd <- parallelize(sc, list()) - actual <- fold(rdd, 0, "+") - expect_equal(actual, 0) -}) - -test_that("aggregateRDD() on RDDs", { - rdd <- parallelize(sc, list(1, 2, 3, 4)) - zeroValue <- list(0, 0) - seqOp <- function(x, y) { list(x[[1]] + y, x[[2]] + 1) } - combOp <- function(x, y) { list(x[[1]] + y[[1]], x[[2]] + y[[2]]) } - actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp) - expect_equal(actual, list(10, 4)) - - rdd <- parallelize(sc, list()) - actual <- aggregateRDD(rdd, zeroValue, seqOp, combOp) - expect_equal(actual, list(0, 0)) -}) - -test_that("zipWithUniqueId() on RDDs", { - rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L) - actual <- collectRDD(zipWithUniqueId(rdd)) - expected <- list(list("a", 0), list("b", 1), list("c", 4), - list("d", 2), list("e", 5)) - expect_equal(actual, expected) - - rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L) - actual <- collectRDD(zipWithUniqueId(rdd)) - expected <- list(list("a", 0), list("b", 1), list("c", 2), - list("d", 3), list("e", 4)) - expect_equal(actual, expected) -}) - -test_that("zipWithIndex() on RDDs", { - rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 3L) - actual <- collectRDD(zipWithIndex(rdd)) - expected <- list(list("a", 0), list("b", 1), list("c", 2), - list("d", 3), list("e", 4)) - expect_equal(actual, expected) - - rdd <- parallelize(sc, list("a", "b", "c", "d", "e"), 1L) - actual <- collectRDD(zipWithIndex(rdd)) - expected <- list(list("a", 0), list("b", 1), list("c", 2), - list("d", 3), list("e", 4)) - expect_equal(actual, expected) -}) - -test_that("glom() on RDD", { - rdd <- parallelize(sc, as.list(1:4), 2L) - actual <- collectRDD(glom(rdd)) - expect_equal(actual, list(list(1, 2), list(3, 4))) -}) - -test_that("keys() on RDDs", { - keys <- keys(intRdd) - actual <- collectRDD(keys) - expect_equal(actual, lapply(intPairs, function(x) { x[[1]] })) -}) - -test_that("values() on RDDs", { - values <- values(intRdd) - actual <- collectRDD(values) - expect_equal(actual, lapply(intPairs, function(x) { x[[2]] })) -}) - -test_that("pipeRDD() on RDDs", { - actual <- collectRDD(pipeRDD(rdd, "more")) - expected <- as.list(as.character(1:10)) - expect_equal(actual, expected) - - trailed.rdd <- parallelize(sc, c("1", "", "2\n", "3\n\r\n")) - actual <- collectRDD(pipeRDD(trailed.rdd, "sort")) - expected <- list("", "1", "2", "3") - expect_equal(actual, expected) - - rev.nums <- 9:0 - rev.rdd <- parallelize(sc, rev.nums, 2L) - actual <- collectRDD(pipeRDD(rev.rdd, "sort")) - expected <- as.list(as.character(c(5:9, 0:4))) - expect_equal(actual, expected) -}) - -test_that("zipRDD() on RDDs", { - rdd1 <- parallelize(sc, 0:4, 2) - rdd2 <- parallelize(sc, 1000:1004, 2) - actual <- collectRDD(zipRDD(rdd1, rdd2)) - expect_equal(actual, - list(list(0, 1000), list(1, 1001), list(2, 1002), list(3, 1003), list(4, 1004))) - - mockFile <- c("Spark is pretty.", "Spark is awesome.") - fileName <- tempfile(pattern = "spark-test", fileext = ".tmp") - writeLines(mockFile, fileName) - - rdd <- textFile(sc, fileName, 1) - actual <- collectRDD(zipRDD(rdd, rdd)) - expected <- lapply(mockFile, function(x) { list(x, x) }) - expect_equal(actual, expected) - - rdd1 <- parallelize(sc, 0:1, 1) - actual <- collectRDD(zipRDD(rdd1, rdd)) - expected <- lapply(0:1, function(x) { list(x, mockFile[x + 1]) }) - expect_equal(actual, expected) - - rdd1 <- map(rdd, function(x) { x }) - actual <- collectRDD(zipRDD(rdd, rdd1)) - expected <- lapply(mockFile, function(x) { list(x, x) }) - expect_equal(actual, expected) - - unlink(fileName) -}) - -test_that("cartesian() on RDDs", { - rdd <- parallelize(sc, 1:3) - actual <- collectRDD(cartesian(rdd, rdd)) - expect_equal(sortKeyValueList(actual), - list( - list(1, 1), list(1, 2), list(1, 3), - list(2, 1), list(2, 2), list(2, 3), - list(3, 1), list(3, 2), list(3, 3))) - - # test case where one RDD is empty - emptyRdd <- parallelize(sc, list()) - actual <- collectRDD(cartesian(rdd, emptyRdd)) - expect_equal(actual, list()) - - mockFile <- c("Spark is pretty.", "Spark is awesome.") - fileName <- tempfile(pattern = "spark-test", fileext = ".tmp") - writeLines(mockFile, fileName) - - rdd <- textFile(sc, fileName) - actual <- collectRDD(cartesian(rdd, rdd)) - expected <- list( - list("Spark is awesome.", "Spark is pretty."), - list("Spark is awesome.", "Spark is awesome."), - list("Spark is pretty.", "Spark is pretty."), - list("Spark is pretty.", "Spark is awesome.")) - expect_equal(sortKeyValueList(actual), expected) - - rdd1 <- parallelize(sc, 0:1) - actual <- collectRDD(cartesian(rdd1, rdd)) - expect_equal(sortKeyValueList(actual), - list( - list(0, "Spark is pretty."), - list(0, "Spark is awesome."), - list(1, "Spark is pretty."), - list(1, "Spark is awesome."))) - - rdd1 <- map(rdd, function(x) { x }) - actual <- collectRDD(cartesian(rdd, rdd1)) - expect_equal(sortKeyValueList(actual), expected) - - unlink(fileName) -}) - -test_that("subtract() on RDDs", { - l <- list(1, 1, 2, 2, 3, 4) - rdd1 <- parallelize(sc, l) - - # subtract by itself - actual <- collectRDD(subtract(rdd1, rdd1)) - expect_equal(actual, list()) - - # subtract by an empty RDD - rdd2 <- parallelize(sc, list()) - actual <- collectRDD(subtract(rdd1, rdd2)) - expect_equal(as.list(sort(as.vector(actual, mode = "integer"))), - l) - - rdd2 <- parallelize(sc, list(2, 4)) - actual <- collectRDD(subtract(rdd1, rdd2)) - expect_equal(as.list(sort(as.vector(actual, mode = "integer"))), - list(1, 1, 3)) - - l <- list("a", "a", "b", "b", "c", "d") - rdd1 <- parallelize(sc, l) - rdd2 <- parallelize(sc, list("b", "d")) - actual <- collectRDD(subtract(rdd1, rdd2)) - expect_equal(as.list(sort(as.vector(actual, mode = "character"))), - list("a", "a", "c")) -}) - -test_that("subtractByKey() on pairwise RDDs", { - l <- list(list("a", 1), list("b", 4), - list("b", 5), list("a", 2)) - rdd1 <- parallelize(sc, l) - - # subtractByKey by itself - actual <- collectRDD(subtractByKey(rdd1, rdd1)) - expect_equal(actual, list()) - - # subtractByKey by an empty RDD - rdd2 <- parallelize(sc, list()) - actual <- collectRDD(subtractByKey(rdd1, rdd2)) - expect_equal(sortKeyValueList(actual), - sortKeyValueList(l)) - - rdd2 <- parallelize(sc, list(list("a", 3), list("c", 1))) - actual <- collectRDD(subtractByKey(rdd1, rdd2)) - expect_equal(actual, - list(list("b", 4), list("b", 5))) - - l <- list(list(1, 1), list(2, 4), - list(2, 5), list(1, 2)) - rdd1 <- parallelize(sc, l) - rdd2 <- parallelize(sc, list(list(1, 3), list(3, 1))) - actual <- collectRDD(subtractByKey(rdd1, rdd2)) - expect_equal(actual, - list(list(2, 4), list(2, 5))) -}) - -test_that("intersection() on RDDs", { - # intersection with self - actual <- collectRDD(intersection(rdd, rdd)) - expect_equal(sort(as.integer(actual)), nums) - - # intersection with an empty RDD - emptyRdd <- parallelize(sc, list()) - actual <- collectRDD(intersection(rdd, emptyRdd)) - expect_equal(actual, list()) - - rdd1 <- parallelize(sc, list(1, 10, 2, 3, 4, 5)) - rdd2 <- parallelize(sc, list(1, 6, 2, 3, 7, 8)) - actual <- collectRDD(intersection(rdd1, rdd2)) - expect_equal(sort(as.integer(actual)), 1:3) -}) - -test_that("join() on pairwise RDDs", { - rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4))) - rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3))) - actual <- collectRDD(joinRDD(rdd1, rdd2, 2L)) - expect_equal(sortKeyValueList(actual), - sortKeyValueList(list(list(1, list(1, 2)), list(1, list(1, 3))))) - - rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4))) - rdd2 <- parallelize(sc, list(list("a", 2), list("a", 3))) - actual <- collectRDD(joinRDD(rdd1, rdd2, 2L)) - expect_equal(sortKeyValueList(actual), - sortKeyValueList(list(list("a", list(1, 2)), list("a", list(1, 3))))) - - rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2))) - rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4))) - actual <- collectRDD(joinRDD(rdd1, rdd2, 2L)) - expect_equal(actual, list()) - - rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2))) - rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4))) - actual <- collectRDD(joinRDD(rdd1, rdd2, 2L)) - expect_equal(actual, list()) -}) - -test_that("leftOuterJoin() on pairwise RDDs", { - rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4))) - rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3))) - actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L)) - expected <- list(list(1, list(1, 2)), list(1, list(1, 3)), list(2, list(4, NULL))) - expect_equal(sortKeyValueList(actual), - sortKeyValueList(expected)) - - rdd1 <- parallelize(sc, list(list("a", 1), list("b", 4))) - rdd2 <- parallelize(sc, list(list("a", 2), list("a", 3))) - actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L)) - expected <- list(list("b", list(4, NULL)), list("a", list(1, 2)), list("a", list(1, 3))) - expect_equal(sortKeyValueList(actual), - sortKeyValueList(expected)) - - rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2))) - rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4))) - actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L)) - expected <- list(list(1, list(1, NULL)), list(2, list(2, NULL))) - expect_equal(sortKeyValueList(actual), - sortKeyValueList(expected)) - - rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2))) - rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4))) - actual <- collectRDD(leftOuterJoin(rdd1, rdd2, 2L)) - expected <- list(list("b", list(2, NULL)), list("a", list(1, NULL))) - expect_equal(sortKeyValueList(actual), - sortKeyValueList(expected)) -}) - -test_that("rightOuterJoin() on pairwise RDDs", { - rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3))) - rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4))) - actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L)) - expected <- list(list(1, list(2, 1)), list(1, list(3, 1)), list(2, list(NULL, 4))) - expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) - - rdd1 <- parallelize(sc, list(list("a", 2), list("a", 3))) - rdd2 <- parallelize(sc, list(list("a", 1), list("b", 4))) - actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L)) - expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)), list("a", list(3, 1))) - expect_equal(sortKeyValueList(actual), - sortKeyValueList(expected)) - - rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2))) - rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4))) - actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L)) - expect_equal(sortKeyValueList(actual), - sortKeyValueList(list(list(3, list(NULL, 3)), list(4, list(NULL, 4))))) - - rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2))) - rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4))) - actual <- collectRDD(rightOuterJoin(rdd1, rdd2, 2L)) - expect_equal(sortKeyValueList(actual), - sortKeyValueList(list(list("d", list(NULL, 4)), list("c", list(NULL, 3))))) -}) - -test_that("fullOuterJoin() on pairwise RDDs", { - rdd1 <- parallelize(sc, list(list(1, 2), list(1, 3), list(3, 3))) - rdd2 <- parallelize(sc, list(list(1, 1), list(2, 4))) - actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L)) - expected <- list(list(1, list(2, 1)), list(1, list(3, 1)), - list(2, list(NULL, 4)), list(3, list(3, NULL))) - expect_equal(sortKeyValueList(actual), sortKeyValueList(expected)) - - rdd1 <- parallelize(sc, list(list("a", 2), list("a", 3), list("c", 1))) - rdd2 <- parallelize(sc, list(list("a", 1), list("b", 4))) - actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L)) - expected <- list(list("b", list(NULL, 4)), list("a", list(2, 1)), - list("a", list(3, 1)), list("c", list(1, NULL))) - expect_equal(sortKeyValueList(actual), - sortKeyValueList(expected)) - - rdd1 <- parallelize(sc, list(list(1, 1), list(2, 2))) - rdd2 <- parallelize(sc, list(list(3, 3), list(4, 4))) - actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L)) - expect_equal(sortKeyValueList(actual), - sortKeyValueList(list(list(1, list(1, NULL)), list(2, list(2, NULL)), - list(3, list(NULL, 3)), list(4, list(NULL, 4))))) - - rdd1 <- parallelize(sc, list(list("a", 1), list("b", 2))) - rdd2 <- parallelize(sc, list(list("c", 3), list("d", 4))) - actual <- collectRDD(fullOuterJoin(rdd1, rdd2, 2L)) - expect_equal(sortKeyValueList(actual), - sortKeyValueList(list(list("a", list(1, NULL)), list("b", list(2, NULL)), - list("d", list(NULL, 4)), list("c", list(NULL, 3))))) -}) - -test_that("sortByKey() on pairwise RDDs", { - numPairsRdd <- map(rdd, function(x) { list (x, x) }) - sortedRdd <- sortByKey(numPairsRdd, ascending = FALSE) - actual <- collectRDD(sortedRdd) - numPairs <- lapply(nums, function(x) { list (x, x) }) - expect_equal(actual, sortKeyValueList(numPairs, decreasing = TRUE)) - - rdd2 <- parallelize(sc, sort(nums, decreasing = TRUE), 2L) - numPairsRdd2 <- map(rdd2, function(x) { list (x, x) }) - sortedRdd2 <- sortByKey(numPairsRdd2) - actual <- collectRDD(sortedRdd2) - expect_equal(actual, numPairs) - - # sort by string keys - l <- list(list("a", 1), list("b", 2), list("1", 3), list("d", 4), list("2", 5)) - rdd3 <- parallelize(sc, l, 2L) - sortedRdd3 <- sortByKey(rdd3) - actual <- collectRDD(sortedRdd3) - expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4))) - - # test on the boundary cases - - # boundary case 1: the RDD to be sorted has only 1 partition - rdd4 <- parallelize(sc, l, 1L) - sortedRdd4 <- sortByKey(rdd4) - actual <- collectRDD(sortedRdd4) - expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4))) - - # boundary case 2: the sorted RDD has only 1 partition - rdd5 <- parallelize(sc, l, 2L) - sortedRdd5 <- sortByKey(rdd5, numPartitions = 1L) - actual <- collectRDD(sortedRdd5) - expect_equal(actual, list(list("1", 3), list("2", 5), list("a", 1), list("b", 2), list("d", 4))) - - # boundary case 3: the RDD to be sorted has only 1 element - l2 <- list(list("a", 1)) - rdd6 <- parallelize(sc, l2, 2L) - sortedRdd6 <- sortByKey(rdd6) - actual <- collectRDD(sortedRdd6) - expect_equal(actual, l2) - - # boundary case 4: the RDD to be sorted has 0 element - l3 <- list() - rdd7 <- parallelize(sc, l3, 2L) - sortedRdd7 <- sortByKey(rdd7) - actual <- collectRDD(sortedRdd7) - expect_equal(actual, l3) -}) - -test_that("collectAsMap() on a pairwise RDD", { - rdd <- parallelize(sc, list(list(1, 2), list(3, 4))) - vals <- collectAsMap(rdd) - expect_equal(vals, list(`1` = 2, `3` = 4)) - - rdd <- parallelize(sc, list(list("a", 1), list("b", 2))) - vals <- collectAsMap(rdd) - expect_equal(vals, list(a = 1, b = 2)) - - rdd <- parallelize(sc, list(list(1.1, 2.2), list(1.2, 2.4))) - vals <- collectAsMap(rdd) - expect_equal(vals, list(`1.1` = 2.2, `1.2` = 2.4)) - - rdd <- parallelize(sc, list(list(1, "a"), list(2, "b"))) - vals <- collectAsMap(rdd) - expect_equal(vals, list(`1` = "a", `2` = "b")) -}) - -test_that("show()", { - rdd <- parallelize(sc, list(1:10)) - expect_output(showRDD(rdd), "ParallelCollectionRDD\\[\\d+\\] at parallelize at RRDD\\.scala:\\d+") -}) - -test_that("sampleByKey() on pairwise RDDs", { - rdd <- parallelize(sc, 1:2000) - pairsRDD <- lapply(rdd, function(x) { if (x %% 2 == 0) list("a", x) else list("b", x) }) - fractions <- list(a = 0.2, b = 0.1) - sample <- sampleByKey(pairsRDD, FALSE, fractions, 1618L) - expect_equal(100 < length(lookup(sample, "a")) && 300 > length(lookup(sample, "a")), TRUE) - expect_equal(50 < length(lookup(sample, "b")) && 150 > length(lookup(sample, "b")), TRUE) - expect_equal(lookup(sample, "a")[which.min(lookup(sample, "a"))] >= 0, TRUE) - expect_equal(lookup(sample, "a")[which.max(lookup(sample, "a"))] <= 2000, TRUE) - expect_equal(lookup(sample, "b")[which.min(lookup(sample, "b"))] >= 0, TRUE) - expect_equal(lookup(sample, "b")[which.max(lookup(sample, "b"))] <= 2000, TRUE) - - rdd <- parallelize(sc, 1:2000) - pairsRDD <- lapply(rdd, function(x) { if (x %% 2 == 0) list(2, x) else list(3, x) }) - fractions <- list(`2` = 0.2, `3` = 0.1) - sample <- sampleByKey(pairsRDD, TRUE, fractions, 1618L) - expect_equal(100 < length(lookup(sample, 2)) && 300 > length(lookup(sample, 2)), TRUE) - expect_equal(50 < length(lookup(sample, 3)) && 150 > length(lookup(sample, 3)), TRUE) - expect_equal(lookup(sample, 2)[which.min(lookup(sample, 2))] >= 0, TRUE) - expect_equal(lookup(sample, 2)[which.max(lookup(sample, 2))] <= 2000, TRUE) - expect_equal(lookup(sample, 3)[which.min(lookup(sample, 3))] >= 0, TRUE) - expect_equal(lookup(sample, 3)[which.max(lookup(sample, 3))] <= 2000, TRUE) -}) - -test_that("Test correct concurrency of RRDD.compute()", { - rdd <- parallelize(sc, 1:1000, 100) - jrdd <- getJRDD(lapply(rdd, function(x) { x }), "row") - zrdd <- callJMethod(jrdd, "zip", jrdd) - count <- callJMethod(zrdd, "count") - expect_equal(count, 1000) -}) - -sparkR.session.stop() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org