http://git-wip-us.apache.org/repos/asf/spark/blob/ae4e8ae4/R/pkg/inst/tests/testthat/test_take.R ---------------------------------------------------------------------- diff --git a/R/pkg/inst/tests/testthat/test_take.R b/R/pkg/inst/tests/testthat/test_take.R deleted file mode 100644 index aaa5328..0000000 --- a/R/pkg/inst/tests/testthat/test_take.R +++ /dev/null @@ -1,69 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -context("tests RDD function take()") - -# Mock data -numVector <- c(-10:97) -numList <- list(sqrt(1), sqrt(2), sqrt(3), 4 ** 10) -strVector <- c("Dexter Morgan: I suppose I should be upset, even feel", - "violated, but I'm not. No, in fact, I think this is a friendly", - "message, like \"Hey, wanna play?\" and yes, I want to play. ", - "I really, really do.") -strList <- list("Dexter Morgan: Blood. Sometimes it sets my teeth on edge, ", - "other times it helps me control the chaos.", - "Dexter Morgan: Harry and Dorris Morgan did a wonderful job ", - "raising me. But they're both dead now. I didn't kill them. Honest.") - -# JavaSparkContext handle -sparkSession <- sparkR.session(enableHiveSupport = FALSE) -sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) - -test_that("take() gives back the original elements in correct count and order", { - numVectorRDD <- parallelize(sc, numVector, 10) - # case: number of elements to take is less than the size of the first partition - expect_equal(takeRDD(numVectorRDD, 1), as.list(head(numVector, n = 1))) - # case: number of elements to take is the same as the size of the first partition - expect_equal(takeRDD(numVectorRDD, 11), as.list(head(numVector, n = 11))) - # case: number of elements to take is greater than all elements - expect_equal(takeRDD(numVectorRDD, length(numVector)), as.list(numVector)) - expect_equal(takeRDD(numVectorRDD, length(numVector) + 1), as.list(numVector)) - - numListRDD <- parallelize(sc, numList, 1) - numListRDD2 <- parallelize(sc, numList, 4) - expect_equal(takeRDD(numListRDD, 3), takeRDD(numListRDD2, 3)) - expect_equal(takeRDD(numListRDD, 5), takeRDD(numListRDD2, 5)) - expect_equal(takeRDD(numListRDD, 1), as.list(head(numList, n = 1))) - expect_equal(takeRDD(numListRDD2, 999), numList) - - strVectorRDD <- parallelize(sc, strVector, 2) - strVectorRDD2 <- parallelize(sc, strVector, 3) - expect_equal(takeRDD(strVectorRDD, 4), as.list(strVector)) - expect_equal(takeRDD(strVectorRDD2, 2), as.list(head(strVector, n = 2))) - - strListRDD <- parallelize(sc, strList, 4) - strListRDD2 <- parallelize(sc, strList, 1) - expect_equal(takeRDD(strListRDD, 3), as.list(head(strList, n = 3))) - expect_equal(takeRDD(strListRDD2, 1), as.list(head(strList, n = 1))) - - expect_equal(length(takeRDD(strListRDD, 0)), 0) - expect_equal(length(takeRDD(strVectorRDD, 0)), 0) - expect_equal(length(takeRDD(numListRDD, 0)), 0) - expect_equal(length(takeRDD(numVectorRDD, 0)), 0) -}) - -sparkR.session.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/ae4e8ae4/R/pkg/inst/tests/testthat/test_textFile.R ---------------------------------------------------------------------- diff --git a/R/pkg/inst/tests/testthat/test_textFile.R b/R/pkg/inst/tests/testthat/test_textFile.R deleted file mode 100644 index 3b46606..0000000 --- a/R/pkg/inst/tests/testthat/test_textFile.R +++ /dev/null @@ -1,164 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -context("the textFile() function") - -# JavaSparkContext handle -sparkSession <- sparkR.session(enableHiveSupport = FALSE) -sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) - -mockFile <- c("Spark is pretty.", "Spark is awesome.") - -test_that("textFile() on a local file returns an RDD", { - fileName <- tempfile(pattern = "spark-test", fileext = ".tmp") - writeLines(mockFile, fileName) - - rdd <- textFile(sc, fileName) - expect_is(rdd, "RDD") - expect_true(countRDD(rdd) > 0) - expect_equal(countRDD(rdd), 2) - - unlink(fileName) -}) - -test_that("textFile() followed by a collect() returns the same content", { - fileName <- tempfile(pattern = "spark-test", fileext = ".tmp") - writeLines(mockFile, fileName) - - rdd <- textFile(sc, fileName) - expect_equal(collectRDD(rdd), as.list(mockFile)) - - unlink(fileName) -}) - -test_that("textFile() word count works as expected", { - fileName <- tempfile(pattern = "spark-test", fileext = ".tmp") - writeLines(mockFile, fileName) - - rdd <- textFile(sc, fileName) - - words <- flatMap(rdd, function(line) { strsplit(line, " ")[[1]] }) - wordCount <- lapply(words, function(word) { list(word, 1L) }) - - counts <- reduceByKey(wordCount, "+", 2L) - output <- collectRDD(counts) - expected <- list(list("pretty.", 1), list("is", 2), list("awesome.", 1), - list("Spark", 2)) - expect_equal(sortKeyValueList(output), sortKeyValueList(expected)) - - unlink(fileName) -}) - -test_that("several transformations on RDD created by textFile()", { - fileName <- tempfile(pattern = "spark-test", fileext = ".tmp") - writeLines(mockFile, fileName) - - rdd <- textFile(sc, fileName) # RDD - for (i in 1:10) { - # PipelinedRDD initially created from RDD - rdd <- lapply(rdd, function(x) paste(x, x)) - } - collectRDD(rdd) - - unlink(fileName) -}) - -test_that("textFile() followed by a saveAsTextFile() returns the same content", { - fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp") - fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp") - writeLines(mockFile, fileName1) - - rdd <- textFile(sc, fileName1, 1L) - saveAsTextFile(rdd, fileName2) - rdd <- textFile(sc, fileName2) - expect_equal(collectRDD(rdd), as.list(mockFile)) - - unlink(fileName1) - unlink(fileName2) -}) - -test_that("saveAsTextFile() on a parallelized list works as expected", { - fileName <- tempfile(pattern = "spark-test", fileext = ".tmp") - l <- list(1, 2, 3) - rdd <- parallelize(sc, l, 1L) - saveAsTextFile(rdd, fileName) - rdd <- textFile(sc, fileName) - expect_equal(collectRDD(rdd), lapply(l, function(x) {toString(x)})) - - unlink(fileName) -}) - -test_that("textFile() and saveAsTextFile() word count works as expected", { - fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp") - fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp") - writeLines(mockFile, fileName1) - - rdd <- textFile(sc, fileName1) - - words <- flatMap(rdd, function(line) { strsplit(line, " ")[[1]] }) - wordCount <- lapply(words, function(word) { list(word, 1L) }) - - counts <- reduceByKey(wordCount, "+", 2L) - - saveAsTextFile(counts, fileName2) - rdd <- textFile(sc, fileName2) - - output <- collectRDD(rdd) - expected <- list(list("awesome.", 1), list("Spark", 2), - list("pretty.", 1), list("is", 2)) - expectedStr <- lapply(expected, function(x) { toString(x) }) - expect_equal(sortKeyValueList(output), sortKeyValueList(expectedStr)) - - unlink(fileName1) - unlink(fileName2) -}) - -test_that("textFile() on multiple paths", { - fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp") - fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp") - writeLines("Spark is pretty.", fileName1) - writeLines("Spark is awesome.", fileName2) - - rdd <- textFile(sc, c(fileName1, fileName2)) - expect_equal(countRDD(rdd), 2) - - unlink(fileName1) - unlink(fileName2) -}) - -test_that("Pipelined operations on RDDs created using textFile", { - fileName <- tempfile(pattern = "spark-test", fileext = ".tmp") - writeLines(mockFile, fileName) - - rdd <- textFile(sc, fileName) - - lengths <- lapply(rdd, function(x) { length(x) }) - expect_equal(collectRDD(lengths), list(1, 1)) - - lengthsPipelined <- lapply(lengths, function(x) { x + 10 }) - expect_equal(collectRDD(lengthsPipelined), list(11, 11)) - - lengths30 <- lapply(lengthsPipelined, function(x) { x + 20 }) - expect_equal(collectRDD(lengths30), list(31, 31)) - - lengths20 <- lapply(lengths, function(x) { x + 20 }) - expect_equal(collectRDD(lengths20), list(21, 21)) - - unlink(fileName) -}) - -sparkR.session.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/ae4e8ae4/R/pkg/inst/tests/testthat/test_utils.R ---------------------------------------------------------------------- diff --git a/R/pkg/inst/tests/testthat/test_utils.R b/R/pkg/inst/tests/testthat/test_utils.R deleted file mode 100644 index c875248..0000000 --- a/R/pkg/inst/tests/testthat/test_utils.R +++ /dev/null @@ -1,242 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -context("functions in utils.R") - -# JavaSparkContext handle -sparkSession <- sparkR.session(enableHiveSupport = FALSE) -sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) - -test_that("convertJListToRList() gives back (deserializes) the original JLists - of strings and integers", { - # It's hard to manually create a Java List using rJava, since it does not - # support generics well. Instead, we rely on collectRDD() returning a - # JList. - nums <- as.list(1:10) - rdd <- parallelize(sc, nums, 1L) - jList <- callJMethod(rdd@jrdd, "collect") - rList <- convertJListToRList(jList, flatten = TRUE) - expect_equal(rList, nums) - - strs <- as.list("hello", "spark") - rdd <- parallelize(sc, strs, 2L) - jList <- callJMethod(rdd@jrdd, "collect") - rList <- convertJListToRList(jList, flatten = TRUE) - expect_equal(rList, strs) -}) - -test_that("serializeToBytes on RDD", { - # File content - mockFile <- c("Spark is pretty.", "Spark is awesome.") - fileName <- tempfile(pattern = "spark-test", fileext = ".tmp") - writeLines(mockFile, fileName) - - text.rdd <- textFile(sc, fileName) - expect_equal(getSerializedMode(text.rdd), "string") - ser.rdd <- serializeToBytes(text.rdd) - expect_equal(collectRDD(ser.rdd), as.list(mockFile)) - expect_equal(getSerializedMode(ser.rdd), "byte") - - unlink(fileName) -}) - -test_that("cleanClosure on R functions", { - y <- c(1, 2, 3) - g <- function(x) { x + 1 } - f <- function(x) { g(x) + y } - newF <- cleanClosure(f) - env <- environment(newF) - expect_equal(length(ls(env)), 2) # y, g - actual <- get("y", envir = env, inherits = FALSE) - expect_equal(actual, y) - actual <- get("g", envir = env, inherits = FALSE) - expect_equal(actual, g) - - # Test for nested enclosures and package variables. - env2 <- new.env() - funcEnv <- new.env(parent = env2) - f <- function(x) { log(g(x) + y) } - environment(f) <- funcEnv # enclosing relationship: f -> funcEnv -> env2 -> .GlobalEnv - newF <- cleanClosure(f) - env <- environment(newF) - expect_equal(length(ls(env)), 2) # "min" should not be included - actual <- get("y", envir = env, inherits = FALSE) - expect_equal(actual, y) - actual <- get("g", envir = env, inherits = FALSE) - expect_equal(actual, g) - - base <- c(1, 2, 3) - l <- list(field = matrix(1)) - field <- matrix(2) - defUse <- 3 - g <- function(x) { x + y } - f <- function(x) { - defUse <- base::as.integer(x) + 1 # Test for access operators `::`. - lapply(x, g) + 1 # Test for capturing function call "g"'s closure as a argument of lapply. - l$field[1, 1] <- 3 # Test for access operators `$`. - res <- defUse + l$field[1, ] # Test for def-use chain of "defUse", and "" symbol. - f(res) # Test for recursive calls. - } - newF <- cleanClosure(f) - env <- environment(newF) - # TODO(shivaram): length(ls(env)) is 4 here for some reason and `lapply` is included in `env`. - # Disabling this test till we debug this. - # - # nolint start - # expect_equal(length(ls(env)), 3) # Only "g", "l" and "f". No "base", "field" or "defUse". - # nolint end - expect_true("g" %in% ls(env)) - expect_true("l" %in% ls(env)) - expect_true("f" %in% ls(env)) - expect_equal(get("l", envir = env, inherits = FALSE), l) - # "y" should be in the environemnt of g. - newG <- get("g", envir = env, inherits = FALSE) - env <- environment(newG) - expect_equal(length(ls(env)), 1) - actual <- get("y", envir = env, inherits = FALSE) - expect_equal(actual, y) - - # Test for function (and variable) definitions. - f <- function(x) { - g <- function(y) { y * 2 } - g(x) - } - newF <- cleanClosure(f) - env <- environment(newF) - expect_equal(length(ls(env)), 0) # "y" and "g" should not be included. - - # Test for overriding variables in base namespace (Issue: SparkR-196). - nums <- as.list(1:10) - rdd <- parallelize(sc, nums, 2L) - t <- 4 # Override base::t in .GlobalEnv. - f <- function(x) { x > t } - newF <- cleanClosure(f) - env <- environment(newF) - expect_equal(ls(env), "t") - expect_equal(get("t", envir = env, inherits = FALSE), t) - actual <- collectRDD(lapply(rdd, f)) - expected <- as.list(c(rep(FALSE, 4), rep(TRUE, 6))) - expect_equal(actual, expected) - - # Test for broadcast variables. - a <- matrix(nrow = 10, ncol = 10, data = rnorm(100)) - aBroadcast <- broadcast(sc, a) - normMultiply <- function(x) { norm(aBroadcast$value) * x } - newnormMultiply <- SparkR:::cleanClosure(normMultiply) - env <- environment(newnormMultiply) - expect_equal(ls(env), "aBroadcast") - expect_equal(get("aBroadcast", envir = env, inherits = FALSE), aBroadcast) -}) - -test_that("varargsToJProperties", { - jprops <- newJObject("java.util.Properties") - expect_true(class(jprops) == "jobj") - - jprops <- varargsToJProperties(abc = "123") - expect_true(class(jprops) == "jobj") - expect_equal(callJMethod(jprops, "getProperty", "abc"), "123") - - jprops <- varargsToJProperties(abc = "abc", b = 1) - expect_equal(callJMethod(jprops, "getProperty", "abc"), "abc") - expect_equal(callJMethod(jprops, "getProperty", "b"), "1") - - jprops <- varargsToJProperties() - expect_equal(callJMethod(jprops, "size"), 0L) -}) - -test_that("convertToJSaveMode", { - s <- convertToJSaveMode("error") - expect_true(class(s) == "jobj") - expect_match(capture.output(print.jobj(s)), "Java ref type org.apache.spark.sql.SaveMode id ") - expect_error(convertToJSaveMode("foo"), - 'mode should be one of "append", "overwrite", "error", "ignore"') #nolint -}) - -test_that("captureJVMException", { - method <- "getSQLDataType" - expect_error(tryCatch(callJStatic("org.apache.spark.sql.api.r.SQLUtils", method, - "unknown"), - error = function(e) { - captureJVMException(e, method) - }), - "Error in getSQLDataType : illegal argument - Invalid type unknown") -}) - -test_that("hashCode", { - expect_error(hashCode("bc53d3605e8a5b7de1e8e271c2317645"), NA) -}) - -test_that("overrideEnvs", { - config <- new.env() - config[["spark.master"]] <- "foo" - config[["config_only"]] <- "ok" - param <- new.env() - param[["spark.master"]] <- "local" - param[["param_only"]] <- "blah" - overrideEnvs(config, param) - expect_equal(config[["spark.master"]], "local") - expect_equal(config[["param_only"]], "blah") - expect_equal(config[["config_only"]], "ok") -}) - -test_that("rbindRaws", { - - # Mixed Column types - r <- serialize(1:5, connection = NULL) - r1 <- serialize(1, connection = NULL) - r2 <- serialize(letters, connection = NULL) - r3 <- serialize(1:10, connection = NULL) - inputData <- list(list(1L, r1, "a", r), list(2L, r2, "b", r), - list(3L, r3, "c", r)) - expected <- data.frame(V1 = 1:3) - expected$V2 <- list(r1, r2, r3) - expected$V3 <- c("a", "b", "c") - expected$V4 <- list(r, r, r) - result <- rbindRaws(inputData) - expect_equal(expected, result) - - # Single binary column - input <- list(list(r1), list(r2), list(r3)) - expected <- subset(expected, select = "V2") - result <- setNames(rbindRaws(input), "V2") - expect_equal(expected, result) - -}) - -test_that("varargsToStrEnv", { - strenv <- varargsToStrEnv(a = 1, b = 1.1, c = TRUE, d = "abcd") - env <- varargsToEnv(a = "1", b = "1.1", c = "true", d = "abcd") - expect_equal(strenv, env) - expect_error(varargsToStrEnv(a = list(1, "a")), - paste0("Unsupported type for a : list. Supported types are logical, ", - "numeric, character and NULL.")) - expect_warning(varargsToStrEnv(a = 1, 2, 3, 4), "Unnamed arguments ignored: 2, 3, 4.") - expect_warning(varargsToStrEnv(1, 2, 3, 4), "Unnamed arguments ignored: 1, 2, 3, 4.") -}) - -test_that("basenameSansExtFromUrl", { - x <- paste0("http://people.apache.org/~pwendell/spark-nightly/spark-branch-2.1-bin/spark-2.1.1-", - "SNAPSHOT-2016_12_09_11_08-eb2d9bf-bin/spark-2.1.1-SNAPSHOT-bin-hadoop2.7.tgz") - y <- paste0("http://people.apache.org/~pwendell/spark-releases/spark-2.1.0-rc2-bin/spark-2.1.0-", - "bin-hadoop2.4-without-hive.tgz") - expect_equal(basenameSansExtFromUrl(x), "spark-2.1.1-SNAPSHOT-bin-hadoop2.7") - expect_equal(basenameSansExtFromUrl(y), "spark-2.1.0-bin-hadoop2.4-without-hive") - z <- "http://people.apache.org/~pwendell/spark-releases/spark-2.1.0--hive.tar.gz" - expect_equal(basenameSansExtFromUrl(z), "spark-2.1.0--hive") -}) - -sparkR.session.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/ae4e8ae4/R/pkg/tests/fulltests/jarTest.R ---------------------------------------------------------------------- diff --git a/R/pkg/tests/fulltests/jarTest.R b/R/pkg/tests/fulltests/jarTest.R new file mode 100644 index 0000000..c9615c8 --- /dev/null +++ b/R/pkg/tests/fulltests/jarTest.R @@ -0,0 +1,32 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +library(SparkR) + +sc <- sparkR.session() + +helloTest <- SparkR:::callJStatic("sparkrtest.DummyClass", + "helloWorld", + "Dave") +stopifnot(identical(helloTest, "Hello Dave")) + +basicFunction <- SparkR:::callJStatic("sparkrtest.DummyClass", + "addStuff", + 2L, + 2L) +stopifnot(basicFunction == 4L) + +sparkR.session.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/ae4e8ae4/R/pkg/tests/fulltests/packageInAJarTest.R ---------------------------------------------------------------------- diff --git a/R/pkg/tests/fulltests/packageInAJarTest.R b/R/pkg/tests/fulltests/packageInAJarTest.R new file mode 100644 index 0000000..4bc935c --- /dev/null +++ b/R/pkg/tests/fulltests/packageInAJarTest.R @@ -0,0 +1,30 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +library(SparkR) +library(sparkPackageTest) + +sparkR.session() + +run1 <- myfunc(5L) + +run2 <- myfunc(-4L) + +sparkR.session.stop() + +if (run1 != 6) quit(save = "no", status = 1) + +if (run2 != -3) quit(save = "no", status = 1) http://git-wip-us.apache.org/repos/asf/spark/blob/ae4e8ae4/R/pkg/tests/fulltests/test_Serde.R ---------------------------------------------------------------------- diff --git a/R/pkg/tests/fulltests/test_Serde.R b/R/pkg/tests/fulltests/test_Serde.R new file mode 100644 index 0000000..b5f6f1b --- /dev/null +++ b/R/pkg/tests/fulltests/test_Serde.R @@ -0,0 +1,79 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +context("SerDe functionality") + +sparkSession <- sparkR.session(enableHiveSupport = FALSE) + +test_that("SerDe of primitive types", { + x <- callJStatic("SparkRHandler", "echo", 1L) + expect_equal(x, 1L) + expect_equal(class(x), "integer") + + x <- callJStatic("SparkRHandler", "echo", 1) + expect_equal(x, 1) + expect_equal(class(x), "numeric") + + x <- callJStatic("SparkRHandler", "echo", TRUE) + expect_true(x) + expect_equal(class(x), "logical") + + x <- callJStatic("SparkRHandler", "echo", "abc") + expect_equal(x, "abc") + expect_equal(class(x), "character") +}) + +test_that("SerDe of list of primitive types", { + x <- list(1L, 2L, 3L) + y <- callJStatic("SparkRHandler", "echo", x) + expect_equal(x, y) + expect_equal(class(y[[1]]), "integer") + + x <- list(1, 2, 3) + y <- callJStatic("SparkRHandler", "echo", x) + expect_equal(x, y) + expect_equal(class(y[[1]]), "numeric") + + x <- list(TRUE, FALSE) + y <- callJStatic("SparkRHandler", "echo", x) + expect_equal(x, y) + expect_equal(class(y[[1]]), "logical") + + x <- list("a", "b", "c") + y <- callJStatic("SparkRHandler", "echo", x) + expect_equal(x, y) + expect_equal(class(y[[1]]), "character") + + # Empty list + x <- list() + y <- callJStatic("SparkRHandler", "echo", x) + expect_equal(x, y) +}) + +test_that("SerDe of list of lists", { + x <- list(list(1L, 2L, 3L), list(1, 2, 3), + list(TRUE, FALSE), list("a", "b", "c")) + y <- callJStatic("SparkRHandler", "echo", x) + expect_equal(x, y) + + # List of empty lists + x <- list(list(), list()) + y <- callJStatic("SparkRHandler", "echo", x) + expect_equal(x, y) +}) + +sparkR.session.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/ae4e8ae4/R/pkg/tests/fulltests/test_Windows.R ---------------------------------------------------------------------- diff --git a/R/pkg/tests/fulltests/test_Windows.R b/R/pkg/tests/fulltests/test_Windows.R new file mode 100644 index 0000000..1d777dd --- /dev/null +++ b/R/pkg/tests/fulltests/test_Windows.R @@ -0,0 +1,27 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +context("Windows-specific tests") + +test_that("sparkJars tag in SparkContext", { + if (.Platform$OS.type != "windows") { + skip("This test is only for Windows, skipped") + } + + testOutput <- launchScript("ECHO", "a/b/c", wait = TRUE) + abcPath <- testOutput[1] + expect_equal(abcPath, "a\\b\\c") +}) http://git-wip-us.apache.org/repos/asf/spark/blob/ae4e8ae4/R/pkg/tests/fulltests/test_binaryFile.R ---------------------------------------------------------------------- diff --git a/R/pkg/tests/fulltests/test_binaryFile.R b/R/pkg/tests/fulltests/test_binaryFile.R new file mode 100644 index 0000000..b5c279e --- /dev/null +++ b/R/pkg/tests/fulltests/test_binaryFile.R @@ -0,0 +1,92 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +context("functions on binary files") + +# JavaSparkContext handle +sparkSession <- sparkR.session(enableHiveSupport = FALSE) +sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) + +mockFile <- c("Spark is pretty.", "Spark is awesome.") + +test_that("saveAsObjectFile()/objectFile() following textFile() works", { + fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp") + fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp") + writeLines(mockFile, fileName1) + + rdd <- textFile(sc, fileName1, 1) + saveAsObjectFile(rdd, fileName2) + rdd <- objectFile(sc, fileName2) + expect_equal(collectRDD(rdd), as.list(mockFile)) + + unlink(fileName1) + unlink(fileName2, recursive = TRUE) +}) + +test_that("saveAsObjectFile()/objectFile() works on a parallelized list", { + fileName <- tempfile(pattern = "spark-test", fileext = ".tmp") + + l <- list(1, 2, 3) + rdd <- parallelize(sc, l, 1) + saveAsObjectFile(rdd, fileName) + rdd <- objectFile(sc, fileName) + expect_equal(collectRDD(rdd), l) + + unlink(fileName, recursive = TRUE) +}) + +test_that("saveAsObjectFile()/objectFile() following RDD transformations works", { + fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp") + fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp") + writeLines(mockFile, fileName1) + + rdd <- textFile(sc, fileName1) + + words <- flatMap(rdd, function(line) { strsplit(line, " ")[[1]] }) + wordCount <- lapply(words, function(word) { list(word, 1L) }) + + counts <- reduceByKey(wordCount, "+", 2L) + + saveAsObjectFile(counts, fileName2) + counts <- objectFile(sc, fileName2) + + output <- collectRDD(counts) + expected <- list(list("awesome.", 1), list("Spark", 2), list("pretty.", 1), + list("is", 2)) + expect_equal(sortKeyValueList(output), sortKeyValueList(expected)) + + unlink(fileName1) + unlink(fileName2, recursive = TRUE) +}) + +test_that("saveAsObjectFile()/objectFile() works with multiple paths", { + fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp") + fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp") + + rdd1 <- parallelize(sc, "Spark is pretty.") + saveAsObjectFile(rdd1, fileName1) + rdd2 <- parallelize(sc, "Spark is awesome.") + saveAsObjectFile(rdd2, fileName2) + + rdd <- objectFile(sc, c(fileName1, fileName2)) + expect_equal(countRDD(rdd), 2) + + unlink(fileName1, recursive = TRUE) + unlink(fileName2, recursive = TRUE) +}) + +sparkR.session.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/ae4e8ae4/R/pkg/tests/fulltests/test_binary_function.R ---------------------------------------------------------------------- diff --git a/R/pkg/tests/fulltests/test_binary_function.R b/R/pkg/tests/fulltests/test_binary_function.R new file mode 100644 index 0000000..59cb2e6 --- /dev/null +++ b/R/pkg/tests/fulltests/test_binary_function.R @@ -0,0 +1,104 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +context("binary functions") + +# JavaSparkContext handle +sparkSession <- sparkR.session(enableHiveSupport = FALSE) +sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) + +# Data +nums <- 1:10 +rdd <- parallelize(sc, nums, 2L) + +# File content +mockFile <- c("Spark is pretty.", "Spark is awesome.") + +test_that("union on two RDDs", { + actual <- collectRDD(unionRDD(rdd, rdd)) + expect_equal(actual, as.list(rep(nums, 2))) + + fileName <- tempfile(pattern = "spark-test", fileext = ".tmp") + writeLines(mockFile, fileName) + + text.rdd <- textFile(sc, fileName) + union.rdd <- unionRDD(rdd, text.rdd) + actual <- collectRDD(union.rdd) + expect_equal(actual, c(as.list(nums), mockFile)) + expect_equal(getSerializedMode(union.rdd), "byte") + + rdd <- map(text.rdd, function(x) {x}) + union.rdd <- unionRDD(rdd, text.rdd) + actual <- collectRDD(union.rdd) + expect_equal(actual, as.list(c(mockFile, mockFile))) + expect_equal(getSerializedMode(union.rdd), "byte") + + unlink(fileName) +}) + +test_that("cogroup on two RDDs", { + rdd1 <- parallelize(sc, list(list(1, 1), list(2, 4))) + rdd2 <- parallelize(sc, list(list(1, 2), list(1, 3))) + cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L) + actual <- collectRDD(cogroup.rdd) + expect_equal(actual, + list(list(1, list(list(1), list(2, 3))), list(2, list(list(4), list())))) + + rdd1 <- parallelize(sc, list(list("a", 1), list("a", 4))) + rdd2 <- parallelize(sc, list(list("b", 2), list("a", 3))) + cogroup.rdd <- cogroup(rdd1, rdd2, numPartitions = 2L) + actual <- collectRDD(cogroup.rdd) + + expected <- list(list("b", list(list(), list(2))), list("a", list(list(1, 4), list(3)))) + expect_equal(sortKeyValueList(actual), + sortKeyValueList(expected)) +}) + +test_that("zipPartitions() on RDDs", { + rdd1 <- parallelize(sc, 1:2, 2L) # 1, 2 + rdd2 <- parallelize(sc, 1:4, 2L) # 1:2, 3:4 + rdd3 <- parallelize(sc, 1:6, 2L) # 1:3, 4:6 + actual <- collectRDD(zipPartitions(rdd1, rdd2, rdd3, + func = function(x, y, z) { list(list(x, y, z))} )) + expect_equal(actual, + list(list(1, c(1, 2), c(1, 2, 3)), list(2, c(3, 4), c(4, 5, 6)))) + + mockFile <- c("Spark is pretty.", "Spark is awesome.") + fileName <- tempfile(pattern = "spark-test", fileext = ".tmp") + writeLines(mockFile, fileName) + + rdd <- textFile(sc, fileName, 1) + actual <- collectRDD(zipPartitions(rdd, rdd, + func = function(x, y) { list(paste(x, y, sep = "\n")) })) + expected <- list(paste(mockFile, mockFile, sep = "\n")) + expect_equal(actual, expected) + + rdd1 <- parallelize(sc, 0:1, 1) + actual <- collectRDD(zipPartitions(rdd1, rdd, + func = function(x, y) { list(x + nchar(y)) })) + expected <- list(0:1 + nchar(mockFile)) + expect_equal(actual, expected) + + rdd <- map(rdd, function(x) { x }) + actual <- collectRDD(zipPartitions(rdd, rdd1, + func = function(x, y) { list(y + nchar(x)) })) + expect_equal(actual, expected) + + unlink(fileName) +}) + +sparkR.session.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/ae4e8ae4/R/pkg/tests/fulltests/test_broadcast.R ---------------------------------------------------------------------- diff --git a/R/pkg/tests/fulltests/test_broadcast.R b/R/pkg/tests/fulltests/test_broadcast.R new file mode 100644 index 0000000..65f204d --- /dev/null +++ b/R/pkg/tests/fulltests/test_broadcast.R @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +context("broadcast variables") + +# JavaSparkContext handle +sparkSession <- sparkR.session(enableHiveSupport = FALSE) +sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) + +# Partitioned data +nums <- 1:2 +rrdd <- parallelize(sc, nums, 2L) + +test_that("using broadcast variable", { + randomMat <- matrix(nrow = 10, ncol = 10, data = rnorm(100)) + randomMatBr <- broadcast(sc, randomMat) + + useBroadcast <- function(x) { + sum(SparkR:::value(randomMatBr) * x) + } + actual <- collectRDD(lapply(rrdd, useBroadcast)) + expected <- list(sum(randomMat) * 1, sum(randomMat) * 2) + expect_equal(actual, expected) +}) + +test_that("without using broadcast variable", { + randomMat <- matrix(nrow = 10, ncol = 10, data = rnorm(100)) + + useBroadcast <- function(x) { + sum(randomMat * x) + } + actual <- collectRDD(lapply(rrdd, useBroadcast)) + expected <- list(sum(randomMat) * 1, sum(randomMat) * 2) + expect_equal(actual, expected) +}) + +sparkR.session.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/ae4e8ae4/R/pkg/tests/fulltests/test_client.R ---------------------------------------------------------------------- diff --git a/R/pkg/tests/fulltests/test_client.R b/R/pkg/tests/fulltests/test_client.R new file mode 100644 index 0000000..0cf25fe --- /dev/null +++ b/R/pkg/tests/fulltests/test_client.R @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +context("functions in client.R") + +test_that("adding spark-testing-base as a package works", { + args <- generateSparkSubmitArgs("", "", "", "", + "holdenk:spark-testing-base:1.3.0_0.0.5") + expect_equal(gsub("[[:space:]]", "", args), + gsub("[[:space:]]", "", + "--packages holdenk:spark-testing-base:1.3.0_0.0.5")) +}) + +test_that("no package specified doesn't add packages flag", { + args <- generateSparkSubmitArgs("", "", "", "", "") + expect_equal(gsub("[[:space:]]", "", args), + "") +}) + +test_that("multiple packages don't produce a warning", { + expect_warning(generateSparkSubmitArgs("", "", "", "", c("A", "B")), NA) +}) + +test_that("sparkJars sparkPackages as character vectors", { + args <- generateSparkSubmitArgs("", "", c("one.jar", "two.jar", "three.jar"), "", + c("com.databricks:spark-avro_2.10:2.0.1")) + expect_match(args, "--jars one.jar,two.jar,three.jar") + expect_match(args, "--packages com.databricks:spark-avro_2.10:2.0.1") +}) http://git-wip-us.apache.org/repos/asf/spark/blob/ae4e8ae4/R/pkg/tests/fulltests/test_context.R ---------------------------------------------------------------------- diff --git a/R/pkg/tests/fulltests/test_context.R b/R/pkg/tests/fulltests/test_context.R new file mode 100644 index 0000000..c847113 --- /dev/null +++ b/R/pkg/tests/fulltests/test_context.R @@ -0,0 +1,210 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +context("test functions in sparkR.R") + +test_that("Check masked functions", { + # Check that we are not masking any new function from base, stats, testthat unexpectedly + # NOTE: We should avoid adding entries to *namesOfMaskedCompletely* as masked functions make it + # hard for users to use base R functions. Please check when in doubt. + namesOfMaskedCompletely <- c("cov", "filter", "sample") + namesOfMasked <- c("describe", "cov", "filter", "lag", "na.omit", "predict", "sd", "var", + "colnames", "colnames<-", "intersect", "rank", "rbind", "sample", "subset", + "summary", "transform", "drop", "window", "as.data.frame", "union") + if (as.numeric(R.version$major) >= 3 && as.numeric(R.version$minor) >= 3) { + namesOfMasked <- c("endsWith", "startsWith", namesOfMasked) + } + masked <- conflicts(detail = TRUE)$`package:SparkR` + expect_true("describe" %in% masked) # only when with testthat.. + func <- lapply(masked, function(x) { capture.output(showMethods(x))[[1]] }) + funcSparkROrEmpty <- grepl("\\(package SparkR\\)$|^$", func) + maskedBySparkR <- masked[funcSparkROrEmpty] + expect_equal(length(maskedBySparkR), length(namesOfMasked)) + # make the 2 lists the same length so expect_equal will print their content + l <- max(length(maskedBySparkR), length(namesOfMasked)) + length(maskedBySparkR) <- l + length(namesOfMasked) <- l + expect_equal(sort(maskedBySparkR, na.last = TRUE), sort(namesOfMasked, na.last = TRUE)) + # above are those reported as masked when `library(SparkR)` + # note that many of these methods are still callable without base:: or stats:: prefix + # there should be a test for each of these, except followings, which are currently "broken" + funcHasAny <- unlist(lapply(masked, function(x) { + any(grepl("=\"ANY\"", capture.output(showMethods(x)[-1]))) + })) + maskedCompletely <- masked[!funcHasAny] + expect_equal(length(maskedCompletely), length(namesOfMaskedCompletely)) + l <- max(length(maskedCompletely), length(namesOfMaskedCompletely)) + length(maskedCompletely) <- l + length(namesOfMaskedCompletely) <- l + expect_equal(sort(maskedCompletely, na.last = TRUE), + sort(namesOfMaskedCompletely, na.last = TRUE)) +}) + +test_that("repeatedly starting and stopping SparkR", { + for (i in 1:4) { + sc <- suppressWarnings(sparkR.init()) + rdd <- parallelize(sc, 1:20, 2L) + expect_equal(countRDD(rdd), 20) + suppressWarnings(sparkR.stop()) + } +}) + +test_that("repeatedly starting and stopping SparkSession", { + for (i in 1:4) { + sparkR.session(enableHiveSupport = FALSE) + df <- createDataFrame(data.frame(dummy = 1:i)) + expect_equal(count(df), i) + sparkR.session.stop() + } +}) + +test_that("rdd GC across sparkR.stop", { + sc <- sparkR.sparkContext() # sc should get id 0 + rdd1 <- parallelize(sc, 1:20, 2L) # rdd1 should get id 1 + rdd2 <- parallelize(sc, 1:10, 2L) # rdd2 should get id 2 + sparkR.session.stop() + + sc <- sparkR.sparkContext() # sc should get id 0 again + + # GC rdd1 before creating rdd3 and rdd2 after + rm(rdd1) + gc() + + rdd3 <- parallelize(sc, 1:20, 2L) # rdd3 should get id 1 now + rdd4 <- parallelize(sc, 1:10, 2L) # rdd4 should get id 2 now + + rm(rdd2) + gc() + + countRDD(rdd3) + countRDD(rdd4) + sparkR.session.stop() +}) + +test_that("job group functions can be called", { + sc <- sparkR.sparkContext() + setJobGroup("groupId", "job description", TRUE) + cancelJobGroup("groupId") + clearJobGroup() + + suppressWarnings(setJobGroup(sc, "groupId", "job description", TRUE)) + suppressWarnings(cancelJobGroup(sc, "groupId")) + suppressWarnings(clearJobGroup(sc)) + sparkR.session.stop() +}) + +test_that("utility function can be called", { + sparkR.sparkContext() + setLogLevel("ERROR") + sparkR.session.stop() +}) + +test_that("getClientModeSparkSubmitOpts() returns spark-submit args from whitelist", { + e <- new.env() + e[["spark.driver.memory"]] <- "512m" + ops <- getClientModeSparkSubmitOpts("sparkrmain", e) + expect_equal("--driver-memory \"512m\" sparkrmain", ops) + + e[["spark.driver.memory"]] <- "5g" + e[["spark.driver.extraClassPath"]] <- "/opt/class_path" # nolint + e[["spark.driver.extraJavaOptions"]] <- "-XX:+UseCompressedOops -XX:+UseCompressedStrings" + e[["spark.driver.extraLibraryPath"]] <- "/usr/local/hadoop/lib" # nolint + e[["random"]] <- "skipthis" + ops2 <- getClientModeSparkSubmitOpts("sparkr-shell", e) + # nolint start + expect_equal(ops2, paste0("--driver-class-path \"/opt/class_path\" --driver-java-options \"", + "-XX:+UseCompressedOops -XX:+UseCompressedStrings\" --driver-library-path \"", + "/usr/local/hadoop/lib\" --driver-memory \"5g\" sparkr-shell")) + # nolint end + + e[["spark.driver.extraClassPath"]] <- "/" # too short + ops3 <- getClientModeSparkSubmitOpts("--driver-memory 4g sparkr-shell2", e) + # nolint start + expect_equal(ops3, paste0("--driver-java-options \"-XX:+UseCompressedOops ", + "-XX:+UseCompressedStrings\" --driver-library-path \"/usr/local/hadoop/lib\"", + " --driver-memory 4g sparkr-shell2")) + # nolint end +}) + +test_that("sparkJars sparkPackages as comma-separated strings", { + expect_warning(processSparkJars(" a, b ")) + jars <- suppressWarnings(processSparkJars(" a, b ")) + expect_equal(lapply(jars, basename), list("a", "b")) + + jars <- suppressWarnings(processSparkJars(" abc ,, def ")) + expect_equal(lapply(jars, basename), list("abc", "def")) + + jars <- suppressWarnings(processSparkJars(c(" abc ,, def ", "", "xyz", " ", "a,b"))) + expect_equal(lapply(jars, basename), list("abc", "def", "xyz", "a", "b")) + + p <- processSparkPackages(c("ghi", "lmn")) + expect_equal(p, c("ghi", "lmn")) + + # check normalizePath + f <- dir()[[1]] + expect_warning(processSparkJars(f), NA) + expect_match(processSparkJars(f), f) +}) + +test_that("spark.lapply should perform simple transforms", { + sparkR.sparkContext() + doubled <- spark.lapply(1:10, function(x) { 2 * x }) + expect_equal(doubled, as.list(2 * 1:10)) + sparkR.session.stop() +}) + +test_that("add and get file to be downloaded with Spark job on every node", { + sparkR.sparkContext() + # Test add file. + path <- tempfile(pattern = "hello", fileext = ".txt") + filename <- basename(path) + words <- "Hello World!" + writeLines(words, path) + spark.addFile(path) + download_path <- spark.getSparkFiles(filename) + expect_equal(readLines(download_path), words) + + # Test spark.getSparkFiles works well on executors. + seq <- seq(from = 1, to = 10, length.out = 5) + f <- function(seq) { spark.getSparkFiles(filename) } + results <- spark.lapply(seq, f) + for (i in 1:5) { expect_equal(basename(results[[i]]), filename) } + + unlink(path) + + # Test add directory recursively. + path <- paste0(tempdir(), "/", "recursive_dir") + dir.create(path) + dir_name <- basename(path) + path1 <- paste0(path, "/", "hello.txt") + file.create(path1) + sub_path <- paste0(path, "/", "sub_hello") + dir.create(sub_path) + path2 <- paste0(sub_path, "/", "sub_hello.txt") + file.create(path2) + words <- "Hello World!" + sub_words <- "Sub Hello World!" + writeLines(words, path1) + writeLines(sub_words, path2) + spark.addFile(path, recursive = TRUE) + download_path1 <- spark.getSparkFiles(paste0(dir_name, "/", "hello.txt")) + expect_equal(readLines(download_path1), words) + download_path2 <- spark.getSparkFiles(paste0(dir_name, "/", "sub_hello/sub_hello.txt")) + expect_equal(readLines(download_path2), sub_words) + unlink(path, recursive = TRUE) + sparkR.session.stop() +}) http://git-wip-us.apache.org/repos/asf/spark/blob/ae4e8ae4/R/pkg/tests/fulltests/test_includePackage.R ---------------------------------------------------------------------- diff --git a/R/pkg/tests/fulltests/test_includePackage.R b/R/pkg/tests/fulltests/test_includePackage.R new file mode 100644 index 0000000..563ea29 --- /dev/null +++ b/R/pkg/tests/fulltests/test_includePackage.R @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +context("include R packages") + +# JavaSparkContext handle +sparkSession <- sparkR.session(enableHiveSupport = FALSE) +sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) + +# Partitioned data +nums <- 1:2 +rdd <- parallelize(sc, nums, 2L) + +test_that("include inside function", { + # Only run the test if plyr is installed. + if ("plyr" %in% rownames(installed.packages())) { + suppressPackageStartupMessages(library(plyr)) + generateData <- function(x) { + suppressPackageStartupMessages(library(plyr)) + attach(airquality) + result <- transform(Ozone, logOzone = log(Ozone)) + result + } + + data <- lapplyPartition(rdd, generateData) + actual <- collectRDD(data) + } +}) + +test_that("use include package", { + # Only run the test if plyr is installed. + if ("plyr" %in% rownames(installed.packages())) { + suppressPackageStartupMessages(library(plyr)) + generateData <- function(x) { + attach(airquality) + result <- transform(Ozone, logOzone = log(Ozone)) + result + } + + includePackage(sc, plyr) + data <- lapplyPartition(rdd, generateData) + actual <- collectRDD(data) + } +}) + +sparkR.session.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/ae4e8ae4/R/pkg/tests/fulltests/test_jvm_api.R ---------------------------------------------------------------------- diff --git a/R/pkg/tests/fulltests/test_jvm_api.R b/R/pkg/tests/fulltests/test_jvm_api.R new file mode 100644 index 0000000..7348c89 --- /dev/null +++ b/R/pkg/tests/fulltests/test_jvm_api.R @@ -0,0 +1,36 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +context("JVM API") + +sparkSession <- sparkR.session(enableHiveSupport = FALSE) + +test_that("Create and call methods on object", { + jarr <- sparkR.newJObject("java.util.ArrayList") + # Add an element to the array + sparkR.callJMethod(jarr, "add", 1L) + # Check if get returns the same element + expect_equal(sparkR.callJMethod(jarr, "get", 0L), 1L) +}) + +test_that("Call static methods", { + # Convert a boolean to a string + strTrue <- sparkR.callJStatic("java.lang.String", "valueOf", TRUE) + expect_equal(strTrue, "true") +}) + +sparkR.session.stop() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org