Repository: spark
Updated Branches:
  refs/heads/master 5301a19a0 -> dc4c35183


http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_streaming.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_streaming.R 
b/R/pkg/tests/fulltests/test_streaming.R
new file mode 100644
index 0000000..b20b431
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_streaming.R
@@ -0,0 +1,167 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(testthat)
+
+context("Structured Streaming")
+
+# Tests for Structured Streaming functions in SparkR
+
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = 
FALSE)
+
+jsonSubDir <- file.path("sparkr-test", "json", "")
+if (.Platform$OS.type == "windows") {
+  # file.path removes the empty separator on Windows, adds it back
+  jsonSubDir <- paste0(jsonSubDir, .Platform$file.sep)
+}
+jsonDir <- file.path(tempdir(), jsonSubDir)
+dir.create(jsonDir, recursive = TRUE)
+
+mockLines <- c("{\"name\":\"Michael\"}",
+               "{\"name\":\"Andy\", \"age\":30}",
+               "{\"name\":\"Justin\", \"age\":19}")
+jsonPath <- tempfile(pattern = jsonSubDir, fileext = ".tmp")
+writeLines(mockLines, jsonPath)
+
+mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}",
+                 "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}",
+                 "{\"name\":\"David\",\"age\":60,\"height\":null}")
+jsonPathNa <- tempfile(pattern = jsonSubDir, fileext = ".tmp")
+
+schema <- structType(structField("name", "string"),
+                     structField("age", "integer"),
+                     structField("count", "double"))
+
+test_that("read.stream, write.stream, awaitTermination, stopQuery", {
+  skip_on_cran()
+
+  df <- read.stream("json", path = jsonDir, schema = schema, 
maxFilesPerTrigger = 1)
+  expect_true(isStreaming(df))
+  counts <- count(group_by(df, "name"))
+  q <- write.stream(counts, "memory", queryName = "people", outputMode = 
"complete")
+
+  expect_false(awaitTermination(q, 5 * 1000))
+  callJMethod(q@ssq, "processAllAvailable")
+  expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 3)
+
+  writeLines(mockLinesNa, jsonPathNa)
+  awaitTermination(q, 5 * 1000)
+  callJMethod(q@ssq, "processAllAvailable")
+  expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 6)
+
+  stopQuery(q)
+  expect_true(awaitTermination(q, 1))
+  expect_error(awaitTermination(q), NA)
+})
+
+test_that("print from explain, lastProgress, status, isActive", {
+  skip_on_cran()
+
+  df <- read.stream("json", path = jsonDir, schema = schema)
+  expect_true(isStreaming(df))
+  counts <- count(group_by(df, "name"))
+  q <- write.stream(counts, "memory", queryName = "people2", outputMode = 
"complete")
+
+  awaitTermination(q, 5 * 1000)
+  callJMethod(q@ssq, "processAllAvailable")
+
+  expect_equal(capture.output(explain(q))[[1]], "== Physical Plan ==")
+  expect_true(any(grepl("\"description\" : \"MemorySink\"", 
capture.output(lastProgress(q)))))
+  expect_true(any(grepl("\"isTriggerActive\" : ", capture.output(status(q)))))
+
+  expect_equal(queryName(q), "people2")
+  expect_true(isActive(q))
+
+  stopQuery(q)
+})
+
+test_that("Stream other format", {
+  skip_on_cran()
+
+  parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
+  df <- read.df(jsonPath, "json", schema)
+  write.df(df, parquetPath, "parquet", "overwrite")
+
+  df <- read.stream(path = parquetPath, schema = schema)
+  expect_true(isStreaming(df))
+  counts <- count(group_by(df, "name"))
+  q <- write.stream(counts, "memory", queryName = "people3", outputMode = 
"complete")
+
+  expect_false(awaitTermination(q, 5 * 1000))
+  callJMethod(q@ssq, "processAllAvailable")
+  expect_equal(head(sql("SELECT count(*) FROM people3"))[[1]], 3)
+
+  expect_equal(queryName(q), "people3")
+  expect_true(any(grepl("\"description\" : 
\"FileStreamSource[[:print:]]+parquet",
+              capture.output(lastProgress(q)))))
+  expect_true(isActive(q))
+
+  stopQuery(q)
+  expect_true(awaitTermination(q, 1))
+  expect_false(isActive(q))
+
+  unlink(parquetPath)
+})
+
+test_that("Non-streaming DataFrame", {
+  skip_on_cran()
+
+  c <- as.DataFrame(cars)
+  expect_false(isStreaming(c))
+
+  expect_error(write.stream(c, "memory", queryName = "people", outputMode = 
"complete"),
+               paste0(".*(writeStream : analysis error - 'writeStream' can be 
called only on ",
+                      "streaming Dataset/DataFrame).*"))
+})
+
+test_that("Unsupported operation", {
+  skip_on_cran()
+
+  # memory sink without aggregation
+  df <- read.stream("json", path = jsonDir, schema = schema, 
maxFilesPerTrigger = 1)
+  expect_error(write.stream(df, "memory", queryName = "people", outputMode = 
"complete"),
+               paste0(".*(start : analysis error - Complete output mode not 
supported when there ",
+                      "are no streaming aggregations on streaming 
DataFrames/Datasets).*"))
+})
+
+test_that("Terminated by error", {
+  skip_on_cran()
+
+  df <- read.stream("json", path = jsonDir, schema = schema, 
maxFilesPerTrigger = -1)
+  counts <- count(group_by(df, "name"))
+  # This would not fail before returning with a StreamingQuery,
+  # but could dump error log at just about the same time
+  expect_error(q <- write.stream(counts, "memory", queryName = "people4", 
outputMode = "complete"),
+               NA)
+
+  expect_error(awaitTermination(q, 5 * 1000),
+               paste0(".*(awaitTermination : streaming query error - Invalid 
value '-1' for option",
+                      " 'maxFilesPerTrigger', must be a positive integer).*"))
+
+  expect_true(any(grepl("\"message\" : \"Terminated with exception: Invalid 
value",
+              capture.output(status(q)))))
+  expect_true(any(grepl("Streaming query has no progress", 
capture.output(lastProgress(q)))))
+  expect_equal(queryName(q), "people4")
+  expect_false(isActive(q))
+
+  stopQuery(q)
+})
+
+unlink(jsonPath)
+unlink(jsonPathNa)
+
+sparkR.session.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_take.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_take.R 
b/R/pkg/tests/fulltests/test_take.R
new file mode 100644
index 0000000..c00723b
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_take.R
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("tests RDD function take()")
+
+# Mock data
+numVector <- c(-10:97)
+numList <- list(sqrt(1), sqrt(2), sqrt(3), 4 ** 10)
+strVector <- c("Dexter Morgan: I suppose I should be upset, even feel",
+               "violated, but I'm not. No, in fact, I think this is a 
friendly",
+               "message, like \"Hey, wanna play?\" and yes, I want to play. ",
+               "I really, really do.")
+strList <- list("Dexter Morgan: Blood. Sometimes it sets my teeth on edge, ",
+                "other times it helps me control the chaos.",
+                "Dexter Morgan: Harry and Dorris Morgan did a wonderful job ",
+                "raising me. But they're both dead now. I didn't kill them. 
Honest.")
+
+# JavaSparkContext handle
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = 
FALSE)
+sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", 
"getJavaSparkContext", sparkSession)
+
+test_that("take() gives back the original elements in correct count and 
order", {
+  skip_on_cran()
+
+  numVectorRDD <- parallelize(sc, numVector, 10)
+  # case: number of elements to take is less than the size of the first 
partition
+  expect_equal(takeRDD(numVectorRDD, 1), as.list(head(numVector, n = 1)))
+  # case: number of elements to take is the same as the size of the first 
partition
+  expect_equal(takeRDD(numVectorRDD, 11), as.list(head(numVector, n = 11)))
+  # case: number of elements to take is greater than all elements
+  expect_equal(takeRDD(numVectorRDD, length(numVector)), as.list(numVector))
+  expect_equal(takeRDD(numVectorRDD, length(numVector) + 1), 
as.list(numVector))
+
+  numListRDD <- parallelize(sc, numList, 1)
+  numListRDD2 <- parallelize(sc, numList, 4)
+  expect_equal(takeRDD(numListRDD, 3), takeRDD(numListRDD2, 3))
+  expect_equal(takeRDD(numListRDD, 5), takeRDD(numListRDD2, 5))
+  expect_equal(takeRDD(numListRDD, 1), as.list(head(numList, n = 1)))
+  expect_equal(takeRDD(numListRDD2, 999), numList)
+
+  strVectorRDD <- parallelize(sc, strVector, 2)
+  strVectorRDD2 <- parallelize(sc, strVector, 3)
+  expect_equal(takeRDD(strVectorRDD, 4), as.list(strVector))
+  expect_equal(takeRDD(strVectorRDD2, 2), as.list(head(strVector, n = 2)))
+
+  strListRDD <- parallelize(sc, strList, 4)
+  strListRDD2 <- parallelize(sc, strList, 1)
+  expect_equal(takeRDD(strListRDD, 3), as.list(head(strList, n = 3)))
+  expect_equal(takeRDD(strListRDD2, 1), as.list(head(strList, n = 1)))
+
+  expect_equal(length(takeRDD(strListRDD, 0)), 0)
+  expect_equal(length(takeRDD(strVectorRDD, 0)), 0)
+  expect_equal(length(takeRDD(numListRDD, 0)), 0)
+  expect_equal(length(takeRDD(numVectorRDD, 0)), 0)
+})
+
+sparkR.session.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_textFile.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_textFile.R 
b/R/pkg/tests/fulltests/test_textFile.R
new file mode 100644
index 0000000..e8a961c
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_textFile.R
@@ -0,0 +1,182 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("the textFile() function")
+
+# JavaSparkContext handle
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = 
FALSE)
+sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", 
"getJavaSparkContext", sparkSession)
+
+mockFile <- c("Spark is pretty.", "Spark is awesome.")
+
+test_that("textFile() on a local file returns an RDD", {
+  skip_on_cran()
+
+  fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
+  writeLines(mockFile, fileName)
+
+  rdd <- textFile(sc, fileName)
+  expect_is(rdd, "RDD")
+  expect_true(countRDD(rdd) > 0)
+  expect_equal(countRDD(rdd), 2)
+
+  unlink(fileName)
+})
+
+test_that("textFile() followed by a collect() returns the same content", {
+  skip_on_cran()
+
+  fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
+  writeLines(mockFile, fileName)
+
+  rdd <- textFile(sc, fileName)
+  expect_equal(collectRDD(rdd), as.list(mockFile))
+
+  unlink(fileName)
+})
+
+test_that("textFile() word count works as expected", {
+  skip_on_cran()
+
+  fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
+  writeLines(mockFile, fileName)
+
+  rdd <- textFile(sc, fileName)
+
+  words <- flatMap(rdd, function(line) { strsplit(line, " ")[[1]] })
+  wordCount <- lapply(words, function(word) { list(word, 1L) })
+
+  counts <- reduceByKey(wordCount, "+", 2L)
+  output <- collectRDD(counts)
+  expected <- list(list("pretty.", 1), list("is", 2), list("awesome.", 1),
+                   list("Spark", 2))
+  expect_equal(sortKeyValueList(output), sortKeyValueList(expected))
+
+  unlink(fileName)
+})
+
+test_that("several transformations on RDD created by textFile()", {
+  skip_on_cran()
+
+  fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
+  writeLines(mockFile, fileName)
+
+  rdd <- textFile(sc, fileName) # RDD
+  for (i in 1:10) {
+    # PipelinedRDD initially created from RDD
+    rdd <- lapply(rdd, function(x) paste(x, x))
+  }
+  collectRDD(rdd)
+
+  unlink(fileName)
+})
+
+test_that("textFile() followed by a saveAsTextFile() returns the same 
content", {
+  skip_on_cran()
+
+  fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp")
+  fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp")
+  writeLines(mockFile, fileName1)
+
+  rdd <- textFile(sc, fileName1, 1L)
+  saveAsTextFile(rdd, fileName2)
+  rdd <- textFile(sc, fileName2)
+  expect_equal(collectRDD(rdd), as.list(mockFile))
+
+  unlink(fileName1)
+  unlink(fileName2)
+})
+
+test_that("saveAsTextFile() on a parallelized list works as expected", {
+  skip_on_cran()
+
+  fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
+  l <- list(1, 2, 3)
+  rdd <- parallelize(sc, l, 1L)
+  saveAsTextFile(rdd, fileName)
+  rdd <- textFile(sc, fileName)
+  expect_equal(collectRDD(rdd), lapply(l, function(x) {toString(x)}))
+
+  unlink(fileName)
+})
+
+test_that("textFile() and saveAsTextFile() word count works as expected", {
+  skip_on_cran()
+
+  fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp")
+  fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp")
+  writeLines(mockFile, fileName1)
+
+  rdd <- textFile(sc, fileName1)
+
+  words <- flatMap(rdd, function(line) { strsplit(line, " ")[[1]] })
+  wordCount <- lapply(words, function(word) { list(word, 1L) })
+
+  counts <- reduceByKey(wordCount, "+", 2L)
+
+  saveAsTextFile(counts, fileName2)
+  rdd <- textFile(sc, fileName2)
+
+  output <- collectRDD(rdd)
+  expected <- list(list("awesome.", 1), list("Spark", 2),
+                   list("pretty.", 1), list("is", 2))
+  expectedStr <- lapply(expected, function(x) { toString(x) })
+  expect_equal(sortKeyValueList(output), sortKeyValueList(expectedStr))
+
+  unlink(fileName1)
+  unlink(fileName2)
+})
+
+test_that("textFile() on multiple paths", {
+  skip_on_cran()
+
+  fileName1 <- tempfile(pattern = "spark-test", fileext = ".tmp")
+  fileName2 <- tempfile(pattern = "spark-test", fileext = ".tmp")
+  writeLines("Spark is pretty.", fileName1)
+  writeLines("Spark is awesome.", fileName2)
+
+  rdd <- textFile(sc, c(fileName1, fileName2))
+  expect_equal(countRDD(rdd), 2)
+
+  unlink(fileName1)
+  unlink(fileName2)
+})
+
+test_that("Pipelined operations on RDDs created using textFile", {
+  skip_on_cran()
+
+  fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
+  writeLines(mockFile, fileName)
+
+  rdd <- textFile(sc, fileName)
+
+  lengths <- lapply(rdd, function(x) { length(x) })
+  expect_equal(collectRDD(lengths), list(1, 1))
+
+  lengthsPipelined <- lapply(lengths, function(x) { x + 10 })
+  expect_equal(collectRDD(lengthsPipelined), list(11, 11))
+
+  lengths30 <- lapply(lengthsPipelined, function(x) { x + 20 })
+  expect_equal(collectRDD(lengths30), list(31, 31))
+
+  lengths20 <- lapply(lengths, function(x) { x + 20 })
+  expect_equal(collectRDD(lengths20), list(21, 21))
+
+  unlink(fileName)
+})
+
+sparkR.session.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_utils.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/fulltests/test_utils.R 
b/R/pkg/tests/fulltests/test_utils.R
new file mode 100644
index 0000000..6197ae7
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_utils.R
@@ -0,0 +1,248 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+context("functions in utils.R")
+
+# JavaSparkContext handle
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = 
FALSE)
+sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", 
"getJavaSparkContext", sparkSession)
+
+test_that("convertJListToRList() gives back (deserializes) the original JLists
+          of strings and integers", {
+  skip_on_cran()
+  # It's hard to manually create a Java List using rJava, since it does not
+  # support generics well. Instead, we rely on collectRDD() returning a
+  # JList.
+  nums <- as.list(1:10)
+  rdd <- parallelize(sc, nums, 1L)
+  jList <- callJMethod(rdd@jrdd, "collect")
+  rList <- convertJListToRList(jList, flatten = TRUE)
+  expect_equal(rList, nums)
+
+  strs <- as.list("hello", "spark")
+  rdd <- parallelize(sc, strs, 2L)
+  jList <- callJMethod(rdd@jrdd, "collect")
+  rList <- convertJListToRList(jList, flatten = TRUE)
+  expect_equal(rList, strs)
+})
+
+test_that("serializeToBytes on RDD", {
+  skip_on_cran()
+  # File content
+  mockFile <- c("Spark is pretty.", "Spark is awesome.")
+  fileName <- tempfile(pattern = "spark-test", fileext = ".tmp")
+  writeLines(mockFile, fileName)
+
+  text.rdd <- textFile(sc, fileName)
+  expect_equal(getSerializedMode(text.rdd), "string")
+  ser.rdd <- serializeToBytes(text.rdd)
+  expect_equal(collectRDD(ser.rdd), as.list(mockFile))
+  expect_equal(getSerializedMode(ser.rdd), "byte")
+
+  unlink(fileName)
+})
+
+test_that("cleanClosure on R functions", {
+  y <- c(1, 2, 3)
+  g <- function(x) { x + 1 }
+  f <- function(x) { g(x) + y }
+  newF <- cleanClosure(f)
+  env <- environment(newF)
+  expect_equal(length(ls(env)), 2)  # y, g
+  actual <- get("y", envir = env, inherits = FALSE)
+  expect_equal(actual, y)
+  actual <- get("g", envir = env, inherits = FALSE)
+  expect_equal(actual, g)
+
+  # Test for nested enclosures and package variables.
+  env2 <- new.env()
+  funcEnv <- new.env(parent = env2)
+  f <- function(x) { log(g(x) + y) }
+  environment(f) <- funcEnv  # enclosing relationship: f -> funcEnv -> env2 -> 
.GlobalEnv
+  newF <- cleanClosure(f)
+  env <- environment(newF)
+  expect_equal(length(ls(env)), 2)  # "min" should not be included
+  actual <- get("y", envir = env, inherits = FALSE)
+  expect_equal(actual, y)
+  actual <- get("g", envir = env, inherits = FALSE)
+  expect_equal(actual, g)
+
+  base <- c(1, 2, 3)
+  l <- list(field = matrix(1))
+  field <- matrix(2)
+  defUse <- 3
+  g <- function(x) { x + y }
+  f <- function(x) {
+    defUse <- base::as.integer(x) + 1  # Test for access operators `::`.
+    lapply(x, g) + 1  # Test for capturing function call "g"'s closure as a 
argument of lapply.
+    l$field[1, 1] <- 3  # Test for access operators `$`.
+    res <- defUse + l$field[1, ]  # Test for def-use chain of "defUse", and "" 
symbol.
+    f(res)  # Test for recursive calls.
+  }
+  newF <- cleanClosure(f)
+  env <- environment(newF)
+  # TODO(shivaram): length(ls(env)) is 4 here for some reason and `lapply` is 
included in `env`.
+  # Disabling this test till we debug this.
+  #
+  # nolint start
+  # expect_equal(length(ls(env)), 3)  # Only "g", "l" and "f". No "base", 
"field" or "defUse".
+  # nolint end
+  expect_true("g" %in% ls(env))
+  expect_true("l" %in% ls(env))
+  expect_true("f" %in% ls(env))
+  expect_equal(get("l", envir = env, inherits = FALSE), l)
+  # "y" should be in the environemnt of g.
+  newG <- get("g", envir = env, inherits = FALSE)
+  env <- environment(newG)
+  expect_equal(length(ls(env)), 1)
+  actual <- get("y", envir = env, inherits = FALSE)
+  expect_equal(actual, y)
+
+  # Test for function (and variable) definitions.
+  f <- function(x) {
+    g <- function(y) { y * 2 }
+    g(x)
+  }
+  newF <- cleanClosure(f)
+  env <- environment(newF)
+  expect_equal(length(ls(env)), 0)  # "y" and "g" should not be included.
+
+  # Test for overriding variables in base namespace (Issue: SparkR-196).
+  nums <- as.list(1:10)
+  rdd <- parallelize(sc, nums, 2L)
+  t <- 4  # Override base::t in .GlobalEnv.
+  f <- function(x) { x > t }
+  newF <- cleanClosure(f)
+  env <- environment(newF)
+  expect_equal(ls(env), "t")
+  expect_equal(get("t", envir = env, inherits = FALSE), t)
+  actual <- collectRDD(lapply(rdd, f))
+  expected <- as.list(c(rep(FALSE, 4), rep(TRUE, 6)))
+  expect_equal(actual, expected)
+
+  # Test for broadcast variables.
+  a <- matrix(nrow = 10, ncol = 10, data = rnorm(100))
+  aBroadcast <- broadcastRDD(sc, a)
+  normMultiply <- function(x) { norm(aBroadcast$value) * x }
+  newnormMultiply <- SparkR:::cleanClosure(normMultiply)
+  env <- environment(newnormMultiply)
+  expect_equal(ls(env), "aBroadcast")
+  expect_equal(get("aBroadcast", envir = env, inherits = FALSE), aBroadcast)
+})
+
+test_that("varargsToJProperties", {
+  jprops <- newJObject("java.util.Properties")
+  expect_true(class(jprops) == "jobj")
+
+  jprops <- varargsToJProperties(abc = "123")
+  expect_true(class(jprops) == "jobj")
+  expect_equal(callJMethod(jprops, "getProperty", "abc"), "123")
+
+  jprops <- varargsToJProperties(abc = "abc", b = 1)
+  expect_equal(callJMethod(jprops, "getProperty", "abc"), "abc")
+  expect_equal(callJMethod(jprops, "getProperty", "b"), "1")
+
+  jprops <- varargsToJProperties()
+  expect_equal(callJMethod(jprops, "size"), 0L)
+})
+
+test_that("convertToJSaveMode", {
+  s <- convertToJSaveMode("error")
+  expect_true(class(s) == "jobj")
+  expect_match(capture.output(print.jobj(s)), "Java ref type 
org.apache.spark.sql.SaveMode id ")
+  expect_error(convertToJSaveMode("foo"),
+    'mode should be one of "append", "overwrite", "error", "ignore"') #nolint
+})
+
+test_that("captureJVMException", {
+  skip_on_cran()
+
+  method <- "createStructField"
+  expect_error(tryCatch(callJStatic("org.apache.spark.sql.api.r.SQLUtils", 
method,
+                                    "col", "unknown", TRUE),
+                        error = function(e) {
+                          captureJVMException(e, method)
+                        }),
+               "parse error - .*DataType unknown.*not supported.")
+})
+
+test_that("hashCode", {
+  skip_on_cran()
+
+  expect_error(hashCode("bc53d3605e8a5b7de1e8e271c2317645"), NA)
+})
+
+test_that("overrideEnvs", {
+  config <- new.env()
+  config[["spark.master"]] <- "foo"
+  config[["config_only"]] <- "ok"
+  param <- new.env()
+  param[["spark.master"]] <- "local"
+  param[["param_only"]] <- "blah"
+  overrideEnvs(config, param)
+  expect_equal(config[["spark.master"]], "local")
+  expect_equal(config[["param_only"]], "blah")
+  expect_equal(config[["config_only"]], "ok")
+})
+
+test_that("rbindRaws", {
+
+  # Mixed Column types
+  r <- serialize(1:5, connection = NULL)
+  r1 <- serialize(1, connection = NULL)
+  r2 <- serialize(letters, connection = NULL)
+  r3 <- serialize(1:10, connection = NULL)
+  inputData <- list(list(1L, r1, "a", r), list(2L, r2, "b", r),
+                    list(3L, r3, "c", r))
+  expected <- data.frame(V1 = 1:3)
+  expected$V2 <- list(r1, r2, r3)
+  expected$V3 <- c("a", "b", "c")
+  expected$V4 <- list(r, r, r)
+  result <- rbindRaws(inputData)
+  expect_equal(expected, result)
+
+  # Single binary column
+  input <- list(list(r1), list(r2), list(r3))
+  expected <- subset(expected, select = "V2")
+  result <- setNames(rbindRaws(input), "V2")
+  expect_equal(expected, result)
+
+})
+
+test_that("varargsToStrEnv", {
+  strenv <- varargsToStrEnv(a = 1, b = 1.1, c = TRUE, d = "abcd")
+  env <- varargsToEnv(a = "1", b = "1.1", c = "true", d = "abcd")
+  expect_equal(strenv, env)
+  expect_error(varargsToStrEnv(a = list(1, "a")),
+               paste0("Unsupported type for a : list. Supported types are 
logical, ",
+                      "numeric, character and NULL."))
+  expect_warning(varargsToStrEnv(a = 1, 2, 3, 4), "Unnamed arguments ignored: 
2, 3, 4.")
+  expect_warning(varargsToStrEnv(1, 2, 3, 4), "Unnamed arguments ignored: 1, 
2, 3, 4.")
+})
+
+test_that("basenameSansExtFromUrl", {
+  x <- 
paste0("http://people.apache.org/~pwendell/spark-nightly/spark-branch-2.1-bin/spark-2.1.1-";,
+              
"SNAPSHOT-2016_12_09_11_08-eb2d9bf-bin/spark-2.1.1-SNAPSHOT-bin-hadoop2.7.tgz")
+  expect_equal(basenameSansExtFromUrl(x), "spark-2.1.1-SNAPSHOT-bin-hadoop2.7")
+  z <- 
"http://people.apache.org/~pwendell/spark-releases/spark-2.1.0--hive.tar.gz";
+  expect_equal(basenameSansExtFromUrl(z), "spark-2.1.0--hive")
+})
+
+sparkR.session.stop()
+
+message("--- End test (utils) ", as.POSIXct(Sys.time(), tz = "GMT"))
+message("elapsed ", (proc.time() - timer_ptm)[3])

http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/run-all.R
----------------------------------------------------------------------
diff --git a/R/pkg/tests/run-all.R b/R/pkg/tests/run-all.R
index f0bef4f..d48e36c 100644
--- a/R/pkg/tests/run-all.R
+++ b/R/pkg/tests/run-all.R
@@ -43,3 +43,11 @@ if (identical(Sys.getenv("NOT_CRAN"), "true")) {
 }
 
 test_package("SparkR")
+
+if (identical(Sys.getenv("NOT_CRAN"), "true")) {
+  # for testthat 1.0.2 later, change reporter from "summary" to 
default_reporter()
+  testthat:::run_tests("SparkR",
+                       file.path(sparkRDir, "pkg", "tests", "fulltests"),
+                       NULL,
+                       "summary")
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to