Repository: spark Updated Branches: refs/heads/branch-2.2 815a0820b -> 0b0be47e7
http://git-wip-us.apache.org/repos/asf/spark/blob/0b0be47e/R/pkg/tests/fulltests/test_streaming.R ---------------------------------------------------------------------- diff --git a/R/pkg/tests/fulltests/test_streaming.R b/R/pkg/tests/fulltests/test_streaming.R new file mode 100644 index 0000000..b20b431 --- /dev/null +++ b/R/pkg/tests/fulltests/test_streaming.R @@ -0,0 +1,167 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +library(testthat) + +context("Structured Streaming") + +# Tests for Structured Streaming functions in SparkR + +sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE) + +jsonSubDir <- file.path("sparkr-test", "json", "") +if (.Platform$OS.type == "windows") { + # file.path removes the empty separator on Windows, adds it back + jsonSubDir <- paste0(jsonSubDir, .Platform$file.sep) +} +jsonDir <- file.path(tempdir(), jsonSubDir) +dir.create(jsonDir, recursive = TRUE) + +mockLines <- c("{\"name\":\"Michael\"}", + "{\"name\":\"Andy\", \"age\":30}", + "{\"name\":\"Justin\", \"age\":19}") +jsonPath <- tempfile(pattern = jsonSubDir, fileext = ".tmp") +writeLines(mockLines, jsonPath) + +mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}", + "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}", + "{\"name\":\"David\",\"age\":60,\"height\":null}") +jsonPathNa <- tempfile(pattern = jsonSubDir, fileext = ".tmp") + +schema <- structType(structField("name", "string"), + structField("age", "integer"), + structField("count", "double")) + +test_that("read.stream, write.stream, awaitTermination, stopQuery", { + skip_on_cran() + + df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1) + expect_true(isStreaming(df)) + counts <- count(group_by(df, "name")) + q <- write.stream(counts, "memory", queryName = "people", outputMode = "complete") + + expect_false(awaitTermination(q, 5 * 1000)) + callJMethod(q@ssq, "processAllAvailable") + expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 3) + + writeLines(mockLinesNa, jsonPathNa) + awaitTermination(q, 5 * 1000) + callJMethod(q@ssq, "processAllAvailable") + expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 6) + + stopQuery(q) + expect_true(awaitTermination(q, 1)) + expect_error(awaitTermination(q), NA) +}) + +test_that("print from explain, lastProgress, status, isActive", { + skip_on_cran() + + df <- read.stream("json", path = jsonDir, schema = schema) + expect_true(isStreaming(df)) + counts <- count(group_by(df, "name")) + q <- write.stream(counts, "memory", queryName = "people2", outputMode = "complete") + + awaitTermination(q, 5 * 1000) + callJMethod(q@ssq, "processAllAvailable") + + expect_equal(capture.output(explain(q))[[1]], "== Physical Plan ==") + expect_true(any(grepl("\"description\" : \"MemorySink\"", capture.output(lastProgress(q))))) + expect_true(any(grepl("\"isTriggerActive\" : ", capture.output(status(q))))) + + expect_equal(queryName(q), "people2") + expect_true(isActive(q)) + + stopQuery(q) +}) + +test_that("Stream other format", { + skip_on_cran() + + parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet") + df <- read.df(jsonPath, "json", schema) + write.df(df, parquetPath, "parquet", "overwrite") + + df <- read.stream(path = parquetPath, schema = schema) + expect_true(isStreaming(df)) + counts <- count(group_by(df, "name")) + q <- write.stream(counts, "memory", queryName = "people3", outputMode = "complete") + + expect_false(awaitTermination(q, 5 * 1000)) + callJMethod(q@ssq, "processAllAvailable") + expect_equal(head(sql("SELECT count(*) FROM people3"))[[1]], 3) + + expect_equal(queryName(q), "people3") + expect_true(any(grepl("\"description\" : \"FileStreamSource[[:print:]]+parquet", + capture.output(lastProgress(q))))) + expect_true(isActive(q)) + + stopQuery(q) + expect_true(awaitTermination(q, 1)) + expect_false(isActive(q)) + + unlink(parquetPath) +}) + +test_that("Non-streaming DataFrame", { + skip_on_cran() + + c <- as.DataFrame(cars) + expect_false(isStreaming(c)) + + expect_error(write.stream(c, "memory", queryName = "people", outputMode = "complete"), + paste0(".*(writeStream : analysis error - 'writeStream' can be called only on ", + "streaming Dataset/DataFrame).*")) +}) + +test_that("Unsupported operation", { + skip_on_cran() + + # memory sink without aggregation + df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1) + expect_error(write.stream(df, "memory", queryName = "people", outputMode = "complete"), + paste0(".*(start : analysis error - Complete output mode not supported when there ", + "are no streaming aggregations on streaming DataFrames/Datasets).*")) +}) + +test_that("Terminated by error", { + skip_on_cran() + + df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = -1) + counts <- count(group_by(df, "name")) + # This would not fail before returning with a StreamingQuery, + # but could dump error log at just about the same time + expect_error(q <- write.stream(counts, "memory", queryName = "people4", outputMode = "complete"), + NA) + + expect_error(awaitTermination(q, 5 * 1000), + paste0(".*(awaitTermination : streaming query error - Invalid value '-1' for option", + " 'maxFilesPerTrigger', must be a positive integer).*")) + + expect_true(any(grepl("\"message\" : \"Terminated with exception: Invalid value", + capture.output(status(q))))) + expect_true(any(grepl("Streaming query has no progress", capture.output(lastProgress(q))))) + expect_equal(queryName(q), "people4") + expect_false(isActive(q)) + + stopQuery(q) +}) + +unlink(jsonPath) +unlink(jsonPathNa) + +sparkR.session.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/0b0be47e/R/pkg/tests/fulltests/test_take.R ---------------------------------------------------------------------- diff --git a/R/pkg/tests/fulltests/test_take.R b/R/pkg/tests/fulltests/test_take.R new file mode 100644 index 0000000..c00723b --- /dev/null +++ b/R/pkg/tests/fulltests/test_take.R @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +context("tests RDD function take()") + +# Mock data +numVector <- c(-10:97) +numList <- list(sqrt(1), sqrt(2), sqrt(3), 4 ** 10) +strVector <- c("Dexter Morgan: I suppose I should be upset, even feel", + "violated, but I'm not. No, in fact, I think this is a friendly", + "message, like \"Hey, wanna play?\" and yes, I want to play. ", + "I really, really do.") +strList <- list("Dexter Morgan: Blood. Sometimes it sets my teeth on edge, ", + "other times it helps me control the chaos.", + "Dexter Morgan: Harry and Dorris Morgan did a wonderful job ", + "raising me. But they're both dead now. I didn't kill them. Honest.") + +# JavaSparkContext handle +sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE) +sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) + +test_that("take() gives back the original elements in correct count and order", { + skip_on_cran() + + numVectorRDD <- parallelize(sc, numVector, 10) + # case: number of elements to take is less than the size of the first partition + expect_equal(takeRDD(numVectorRDD, 1), as.list(head(numVector, n = 1))) + # case: number of elements to take is the same as the size of the first partition + expect_equal(takeRDD(numVectorRDD, 11), as.list(head(numVector, n = 11))) + # case: number of elements to take is greater than all elements + expect_equal(takeRDD(numVectorRDD, length(numVector)), as.list(numVector)) + expect_equal(takeRDD(numVectorRDD, length(numVector) + 1), as.list(numVector)) + + numListRDD <- parallelize(sc, numList, 1) + numListRDD2 <- parallelize(sc, numList, 4) + expect_equal(takeRDD(numListRDD, 3), takeRDD(numListRDD2, 3)) + expect_equal(takeRDD(numListRDD, 5), takeRDD(numListRDD2, 5)) + expect_equal(takeRDD(numListRDD, 1), as.list(head(numList, n = 1))) + expect_equal(takeRDD(numListRDD2, 999), numList) + + strVectorRDD <- parallelize(sc, strVector, 2) + strVectorRDD2 <- parallelize(sc, strVector, 3) + expect_equal(takeRDD(strVectorRDD, 4), as.list(strVector)) + expect_equal(takeRDD(strVectorRDD2, 2), as.list(head(strVector, n = 2))) + + strListRDD <- parallelize(sc, strList, 4) + strListRDD2 <- parallelize(sc, strList, 1) + expect_equal(takeRDD(strListRDD, 3), as.list(head(strList, n = 3))) + expect_equal(takeRDD(strListRDD2, 1), as.list(head(strList, n = 1))) + + expect_equal(length(takeRDD(strListRDD, 0)), 0) + expect_equal(length(takeRDD(strVectorRDD, 0)), 0) + expect_equal(length(takeRDD(numListRDD, 0)), 0) + expect_equal(length(takeRDD(numVectorRDD, 0)), 0) +}) + +sparkR.session.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/0b0be47e/R/pkg/tests/fulltests/test_textFile.R ---------------------------------------------------------------------- diff --git a/R/pkg/tests/fulltests/test_textFile.R b/R/pkg/tests/fulltests/test_textFile.R new file mode 100644 index 0000000..e8a961c --- /dev/null +++ b/R/pkg/tests/fulltests/test_textFile.R @@ -0,0 +1,182 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +context("the textFile() function") + +# JavaSparkContext handle +sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE) +sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) + +mockFile <- c("Spark is pretty.", "Spark is awesome.") + +test_that("textFile() on a local file returns an RDD", { + skip_on_cran() + + fileName <- tempfile(pattern = "spark-test", fileext = ".tmp") + writeLines(mockFile, fileName) + + rdd <- textFile(sc, fileName) + expect_is(rdd, "RDD") + expect_true(countRDD(rdd) > 0) + expect_equal(countRDD(rdd), 2) + + unlink(fileName) +}) + +test_that("textFile() followed by a collect() returns the same content", { + skip_on_cran() + + fileName <- tempfile(pattern = "spark-test", fileext = ".tmp") + writeLines(mockFile, fileName) + + rdd <- textFile(sc, fileName) + expect_equal(collectRDD(rdd), as.list(mockFile)) + + unlink(fileName) +}) + +test_that("textFile() word count works as expected", { + skip_on_cran() + + fileName <- tempfile(pattern = "spark-test", fileext = ".tmp") + writeLines(mockFile, fileName) + + rdd <- textFile(sc, fileName) + + words <- flatMap(rdd, function(line) { strsplit(line, " ")[[1]] }) + wordCount <- lapply(words, function(word) { list(word, 1L) }) + + counts <- reduceByKey(wordCount, "+", 2L) + output <- collectRDD(counts) + expected <- list(list("pretty.", 1), list("is", 2), list("awesome.", 1), + list("Spark", 2)) + expect_equal(sortKeyValueList(output), sortKeyValueList(expected)) + + unlink(fileName) +}) + +test_that("several transformations on RDD created by textFile()", { + skip_on_cran() + + fileName <- tempfile(pattern = "spark-test", fileext = ".tmp") + writeLines(mockFile, fileName) + + rdd <- textFile(sc, fileName) # RDD + for (i in 1:10) { + # PipelinedRDD initially created from RDD + rdd <- lapply(rdd, function(x) paste(x, x)) + } + collectRDD(rdd) + + unlink(fileName) +}) + +test_that("textFile() followed by a saveAsTextFile() returns the same content", { + skip_on_cran() + + fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp") + fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp") + writeLines(mockFile, fileName1) + + rdd <- textFile(sc, fileName1, 1L) + saveAsTextFile(rdd, fileName2) + rdd <- textFile(sc, fileName2) + expect_equal(collectRDD(rdd), as.list(mockFile)) + + unlink(fileName1) + unlink(fileName2) +}) + +test_that("saveAsTextFile() on a parallelized list works as expected", { + skip_on_cran() + + fileName <- tempfile(pattern = "spark-test", fileext = ".tmp") + l <- list(1, 2, 3) + rdd <- parallelize(sc, l, 1L) + saveAsTextFile(rdd, fileName) + rdd <- textFile(sc, fileName) + expect_equal(collectRDD(rdd), lapply(l, function(x) {toString(x)})) + + unlink(fileName) +}) + +test_that("textFile() and saveAsTextFile() word count works as expected", { + skip_on_cran() + + fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp") + fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp") + writeLines(mockFile, fileName1) + + rdd <- textFile(sc, fileName1) + + words <- flatMap(rdd, function(line) { strsplit(line, " ")[[1]] }) + wordCount <- lapply(words, function(word) { list(word, 1L) }) + + counts <- reduceByKey(wordCount, "+", 2L) + + saveAsTextFile(counts, fileName2) + rdd <- textFile(sc, fileName2) + + output <- collectRDD(rdd) + expected <- list(list("awesome.", 1), list("Spark", 2), + list("pretty.", 1), list("is", 2)) + expectedStr <- lapply(expected, function(x) { toString(x) }) + expect_equal(sortKeyValueList(output), sortKeyValueList(expectedStr)) + + unlink(fileName1) + unlink(fileName2) +}) + +test_that("textFile() on multiple paths", { + skip_on_cran() + + fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp") + fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp") + writeLines("Spark is pretty.", fileName1) + writeLines("Spark is awesome.", fileName2) + + rdd <- textFile(sc, c(fileName1, fileName2)) + expect_equal(countRDD(rdd), 2) + + unlink(fileName1) + unlink(fileName2) +}) + +test_that("Pipelined operations on RDDs created using textFile", { + skip_on_cran() + + fileName <- tempfile(pattern = "spark-test", fileext = ".tmp") + writeLines(mockFile, fileName) + + rdd <- textFile(sc, fileName) + + lengths <- lapply(rdd, function(x) { length(x) }) + expect_equal(collectRDD(lengths), list(1, 1)) + + lengthsPipelined <- lapply(lengths, function(x) { x + 10 }) + expect_equal(collectRDD(lengthsPipelined), list(11, 11)) + + lengths30 <- lapply(lengthsPipelined, function(x) { x + 20 }) + expect_equal(collectRDD(lengths30), list(31, 31)) + + lengths20 <- lapply(lengths, function(x) { x + 20 }) + expect_equal(collectRDD(lengths20), list(21, 21)) + + unlink(fileName) +}) + +sparkR.session.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/0b0be47e/R/pkg/tests/fulltests/test_utils.R ---------------------------------------------------------------------- diff --git a/R/pkg/tests/fulltests/test_utils.R b/R/pkg/tests/fulltests/test_utils.R new file mode 100644 index 0000000..0161471 --- /dev/null +++ b/R/pkg/tests/fulltests/test_utils.R @@ -0,0 +1,247 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +context("functions in utils.R") + +# JavaSparkContext handle +sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = FALSE) +sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession) + +test_that("convertJListToRList() gives back (deserializes) the original JLists + of strings and integers", { + skip_on_cran() + # It's hard to manually create a Java List using rJava, since it does not + # support generics well. Instead, we rely on collectRDD() returning a + # JList. + nums <- as.list(1:10) + rdd <- parallelize(sc, nums, 1L) + jList <- callJMethod(rdd@jrdd, "collect") + rList <- convertJListToRList(jList, flatten = TRUE) + expect_equal(rList, nums) + + strs <- as.list("hello", "spark") + rdd <- parallelize(sc, strs, 2L) + jList <- callJMethod(rdd@jrdd, "collect") + rList <- convertJListToRList(jList, flatten = TRUE) + expect_equal(rList, strs) +}) + +test_that("serializeToBytes on RDD", { + skip_on_cran() + # File content + mockFile <- c("Spark is pretty.", "Spark is awesome.") + fileName <- tempfile(pattern = "spark-test", fileext = ".tmp") + writeLines(mockFile, fileName) + + text.rdd <- textFile(sc, fileName) + expect_equal(getSerializedMode(text.rdd), "string") + ser.rdd <- serializeToBytes(text.rdd) + expect_equal(collectRDD(ser.rdd), as.list(mockFile)) + expect_equal(getSerializedMode(ser.rdd), "byte") + + unlink(fileName) +}) + +test_that("cleanClosure on R functions", { + y <- c(1, 2, 3) + g <- function(x) { x + 1 } + f <- function(x) { g(x) + y } + newF <- cleanClosure(f) + env <- environment(newF) + expect_equal(length(ls(env)), 2) # y, g + actual <- get("y", envir = env, inherits = FALSE) + expect_equal(actual, y) + actual <- get("g", envir = env, inherits = FALSE) + expect_equal(actual, g) + + # Test for nested enclosures and package variables. + env2 <- new.env() + funcEnv <- new.env(parent = env2) + f <- function(x) { log(g(x) + y) } + environment(f) <- funcEnv # enclosing relationship: f -> funcEnv -> env2 -> .GlobalEnv + newF <- cleanClosure(f) + env <- environment(newF) + expect_equal(length(ls(env)), 2) # "min" should not be included + actual <- get("y", envir = env, inherits = FALSE) + expect_equal(actual, y) + actual <- get("g", envir = env, inherits = FALSE) + expect_equal(actual, g) + + base <- c(1, 2, 3) + l <- list(field = matrix(1)) + field <- matrix(2) + defUse <- 3 + g <- function(x) { x + y } + f <- function(x) { + defUse <- base::as.integer(x) + 1 # Test for access operators `::`. + lapply(x, g) + 1 # Test for capturing function call "g"'s closure as a argument of lapply. + l$field[1, 1] <- 3 # Test for access operators `$`. + res <- defUse + l$field[1, ] # Test for def-use chain of "defUse", and "" symbol. + f(res) # Test for recursive calls. + } + newF <- cleanClosure(f) + env <- environment(newF) + # TODO(shivaram): length(ls(env)) is 4 here for some reason and `lapply` is included in `env`. + # Disabling this test till we debug this. + # + # nolint start + # expect_equal(length(ls(env)), 3) # Only "g", "l" and "f". No "base", "field" or "defUse". + # nolint end + expect_true("g" %in% ls(env)) + expect_true("l" %in% ls(env)) + expect_true("f" %in% ls(env)) + expect_equal(get("l", envir = env, inherits = FALSE), l) + # "y" should be in the environemnt of g. + newG <- get("g", envir = env, inherits = FALSE) + env <- environment(newG) + expect_equal(length(ls(env)), 1) + actual <- get("y", envir = env, inherits = FALSE) + expect_equal(actual, y) + + # Test for function (and variable) definitions. + f <- function(x) { + g <- function(y) { y * 2 } + g(x) + } + newF <- cleanClosure(f) + env <- environment(newF) + expect_equal(length(ls(env)), 0) # "y" and "g" should not be included. + + # Test for overriding variables in base namespace (Issue: SparkR-196). + nums <- as.list(1:10) + rdd <- parallelize(sc, nums, 2L) + t <- 4 # Override base::t in .GlobalEnv. + f <- function(x) { x > t } + newF <- cleanClosure(f) + env <- environment(newF) + expect_equal(ls(env), "t") + expect_equal(get("t", envir = env, inherits = FALSE), t) + actual <- collectRDD(lapply(rdd, f)) + expected <- as.list(c(rep(FALSE, 4), rep(TRUE, 6))) + expect_equal(actual, expected) + + # Test for broadcast variables. + a <- matrix(nrow = 10, ncol = 10, data = rnorm(100)) + aBroadcast <- broadcast(sc, a) + normMultiply <- function(x) { norm(aBroadcast$value) * x } + newnormMultiply <- SparkR:::cleanClosure(normMultiply) + env <- environment(newnormMultiply) + expect_equal(ls(env), "aBroadcast") + expect_equal(get("aBroadcast", envir = env, inherits = FALSE), aBroadcast) +}) + +test_that("varargsToJProperties", { + jprops <- newJObject("java.util.Properties") + expect_true(class(jprops) == "jobj") + + jprops <- varargsToJProperties(abc = "123") + expect_true(class(jprops) == "jobj") + expect_equal(callJMethod(jprops, "getProperty", "abc"), "123") + + jprops <- varargsToJProperties(abc = "abc", b = 1) + expect_equal(callJMethod(jprops, "getProperty", "abc"), "abc") + expect_equal(callJMethod(jprops, "getProperty", "b"), "1") + + jprops <- varargsToJProperties() + expect_equal(callJMethod(jprops, "size"), 0L) +}) + +test_that("convertToJSaveMode", { + s <- convertToJSaveMode("error") + expect_true(class(s) == "jobj") + expect_match(capture.output(print.jobj(s)), "Java ref type org.apache.spark.sql.SaveMode id ") + expect_error(convertToJSaveMode("foo"), + 'mode should be one of "append", "overwrite", "error", "ignore"') #nolint +}) + +test_that("captureJVMException", { + skip_on_cran() + method <- "getSQLDataType" + expect_error(tryCatch(callJStatic("org.apache.spark.sql.api.r.SQLUtils", method, + "unknown"), + error = function(e) { + captureJVMException(e, method) + }), + "Error in getSQLDataType : illegal argument - Invalid type unknown") +}) + +test_that("hashCode", { + skip_on_cran() + + expect_error(hashCode("bc53d3605e8a5b7de1e8e271c2317645"), NA) +}) + +test_that("overrideEnvs", { + config <- new.env() + config[["spark.master"]] <- "foo" + config[["config_only"]] <- "ok" + param <- new.env() + param[["spark.master"]] <- "local" + param[["param_only"]] <- "blah" + overrideEnvs(config, param) + expect_equal(config[["spark.master"]], "local") + expect_equal(config[["param_only"]], "blah") + expect_equal(config[["config_only"]], "ok") +}) + +test_that("rbindRaws", { + + # Mixed Column types + r <- serialize(1:5, connection = NULL) + r1 <- serialize(1, connection = NULL) + r2 <- serialize(letters, connection = NULL) + r3 <- serialize(1:10, connection = NULL) + inputData <- list(list(1L, r1, "a", r), list(2L, r2, "b", r), + list(3L, r3, "c", r)) + expected <- data.frame(V1 = 1:3) + expected$V2 <- list(r1, r2, r3) + expected$V3 <- c("a", "b", "c") + expected$V4 <- list(r, r, r) + result <- rbindRaws(inputData) + expect_equal(expected, result) + + # Single binary column + input <- list(list(r1), list(r2), list(r3)) + expected <- subset(expected, select = "V2") + result <- setNames(rbindRaws(input), "V2") + expect_equal(expected, result) + +}) + +test_that("varargsToStrEnv", { + strenv <- varargsToStrEnv(a = 1, b = 1.1, c = TRUE, d = "abcd") + env <- varargsToEnv(a = "1", b = "1.1", c = "true", d = "abcd") + expect_equal(strenv, env) + expect_error(varargsToStrEnv(a = list(1, "a")), + paste0("Unsupported type for a : list. Supported types are logical, ", + "numeric, character and NULL.")) + expect_warning(varargsToStrEnv(a = 1, 2, 3, 4), "Unnamed arguments ignored: 2, 3, 4.") + expect_warning(varargsToStrEnv(1, 2, 3, 4), "Unnamed arguments ignored: 1, 2, 3, 4.") +}) + +test_that("basenameSansExtFromUrl", { + x <- paste0("http://people.apache.org/~pwendell/spark-nightly/spark-branch-2.1-bin/spark-2.1.1-", + "SNAPSHOT-2016_12_09_11_08-eb2d9bf-bin/spark-2.1.1-SNAPSHOT-bin-hadoop2.7.tgz") + expect_equal(basenameSansExtFromUrl(x), "spark-2.1.1-SNAPSHOT-bin-hadoop2.7") + z <- "http://people.apache.org/~pwendell/spark-releases/spark-2.1.0--hive.tar.gz" + expect_equal(basenameSansExtFromUrl(z), "spark-2.1.0--hive") +}) + +sparkR.session.stop() + +message("--- End test (utils) ", as.POSIXct(Sys.time(), tz = "GMT")) +message("elapsed ", (proc.time() - timer_ptm)[3]) http://git-wip-us.apache.org/repos/asf/spark/blob/0b0be47e/R/pkg/tests/run-all.R ---------------------------------------------------------------------- diff --git a/R/pkg/tests/run-all.R b/R/pkg/tests/run-all.R index f0bef4f..d48e36c 100644 --- a/R/pkg/tests/run-all.R +++ b/R/pkg/tests/run-all.R @@ -43,3 +43,11 @@ if (identical(Sys.getenv("NOT_CRAN"), "true")) { } test_package("SparkR") + +if (identical(Sys.getenv("NOT_CRAN"), "true")) { + # for testthat 1.0.2 later, change reporter from "summary" to default_reporter() + testthat:::run_tests("SparkR", + file.path(sparkRDir, "pkg", "tests", "fulltests"), + NULL, + "summary") +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org