[1/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN

2017-06-11 Thread felixcheung
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 815a0820b -> 0b0be47e7


http://git-wip-us.apache.org/repos/asf/spark/blob/0b0be47e/R/pkg/tests/fulltests/test_streaming.R
--
diff --git a/R/pkg/tests/fulltests/test_streaming.R 
b/R/pkg/tests/fulltests/test_streaming.R
new file mode 100644
index 000..b20b431
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_streaming.R
@@ -0,0 +1,167 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(testthat)
+
+context("Structured Streaming")
+
+# Tests for Structured Streaming functions in SparkR
+
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = 
FALSE)
+
+jsonSubDir <- file.path("sparkr-test", "json", "")
+if (.Platform$OS.type == "windows") {
+  # file.path removes the empty separator on Windows, adds it back
+  jsonSubDir <- paste0(jsonSubDir, .Platform$file.sep)
+}
+jsonDir <- file.path(tempdir(), jsonSubDir)
+dir.create(jsonDir, recursive = TRUE)
+
+mockLines <- c("{\"name\":\"Michael\"}",
+   "{\"name\":\"Andy\", \"age\":30}",
+   "{\"name\":\"Justin\", \"age\":19}")
+jsonPath <- tempfile(pattern = jsonSubDir, fileext = ".tmp")
+writeLines(mockLines, jsonPath)
+
+mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}",
+ "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}",
+ "{\"name\":\"David\",\"age\":60,\"height\":null}")
+jsonPathNa <- tempfile(pattern = jsonSubDir, fileext = ".tmp")
+
+schema <- structType(structField("name", "string"),
+ structField("age", "integer"),
+ structField("count", "double"))
+
+test_that("read.stream, write.stream, awaitTermination, stopQuery", {
+  skip_on_cran()
+
+  df <- read.stream("json", path = jsonDir, schema = schema, 
maxFilesPerTrigger = 1)
+  expect_true(isStreaming(df))
+  counts <- count(group_by(df, "name"))
+  q <- write.stream(counts, "memory", queryName = "people", outputMode = 
"complete")
+
+  expect_false(awaitTermination(q, 5 * 1000))
+  callJMethod(q@ssq, "processAllAvailable")
+  expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 3)
+
+  writeLines(mockLinesNa, jsonPathNa)
+  awaitTermination(q, 5 * 1000)
+  callJMethod(q@ssq, "processAllAvailable")
+  expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 6)
+
+  stopQuery(q)
+  expect_true(awaitTermination(q, 1))
+  expect_error(awaitTermination(q), NA)
+})
+
+test_that("print from explain, lastProgress, status, isActive", {
+  skip_on_cran()
+
+  df <- read.stream("json", path = jsonDir, schema = schema)
+  expect_true(isStreaming(df))
+  counts <- count(group_by(df, "name"))
+  q <- write.stream(counts, "memory", queryName = "people2", outputMode = 
"complete")
+
+  awaitTermination(q, 5 * 1000)
+  callJMethod(q@ssq, "processAllAvailable")
+
+  expect_equal(capture.output(explain(q))[[1]], "== Physical Plan ==")
+  expect_true(any(grepl("\"description\" : \"MemorySink\"", 
capture.output(lastProgress(q)
+  expect_true(any(grepl("\"isTriggerActive\" : ", capture.output(status(q)
+
+  expect_equal(queryName(q), "people2")
+  expect_true(isActive(q))
+
+  stopQuery(q)
+})
+
+test_that("Stream other format", {
+  skip_on_cran()
+
+  parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
+  df <- read.df(jsonPath, "json", schema)
+  write.df(df, parquetPath, "parquet", "overwrite")
+
+  df <- read.stream(path = parquetPath, schema = schema)
+  expect_true(isStreaming(df))
+  counts <- count(group_by(df, "name"))
+  q <- write.stream(counts, "memory", queryName = "people3", outputMode = 
"complete")
+
+  expect_false(awaitTermination(q, 5 * 1000))
+  callJMethod(q@ssq, "processAllAvailable")
+  expect_equal(head(sql("SELECT count(*) FROM people3"))[[1]], 3)
+
+  expect_equal(queryName(q), "people3")
+  expect_true(any(grepl("\"description\" : 
\"FileStreamSource[[:print:]]+parquet",
+  capture.output(lastProgress(q)
+  expect_true(isActive(q))
+
+  stopQuery(q)
+  expect_true(awaitTermination(q, 1))
+  expect_false(isActive(q))
+
+  unlink(parquetPath)
+})
+
+test_that("Non-streaming DataFrame", {
+  skip_on_cran()
+
+  c <- as.DataFrame(cars)
+ 

[1/7] spark git commit: [SPARK-20877][SPARKR] refactor tests to basic tests only for CRAN

2017-06-11 Thread felixcheung
Repository: spark
Updated Branches:
  refs/heads/master 5301a19a0 -> dc4c35183


http://git-wip-us.apache.org/repos/asf/spark/blob/dc4c3518/R/pkg/tests/fulltests/test_streaming.R
--
diff --git a/R/pkg/tests/fulltests/test_streaming.R 
b/R/pkg/tests/fulltests/test_streaming.R
new file mode 100644
index 000..b20b431
--- /dev/null
+++ b/R/pkg/tests/fulltests/test_streaming.R
@@ -0,0 +1,167 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(testthat)
+
+context("Structured Streaming")
+
+# Tests for Structured Streaming functions in SparkR
+
+sparkSession <- sparkR.session(master = sparkRTestMaster, enableHiveSupport = 
FALSE)
+
+jsonSubDir <- file.path("sparkr-test", "json", "")
+if (.Platform$OS.type == "windows") {
+  # file.path removes the empty separator on Windows, adds it back
+  jsonSubDir <- paste0(jsonSubDir, .Platform$file.sep)
+}
+jsonDir <- file.path(tempdir(), jsonSubDir)
+dir.create(jsonDir, recursive = TRUE)
+
+mockLines <- c("{\"name\":\"Michael\"}",
+   "{\"name\":\"Andy\", \"age\":30}",
+   "{\"name\":\"Justin\", \"age\":19}")
+jsonPath <- tempfile(pattern = jsonSubDir, fileext = ".tmp")
+writeLines(mockLines, jsonPath)
+
+mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}",
+ "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}",
+ "{\"name\":\"David\",\"age\":60,\"height\":null}")
+jsonPathNa <- tempfile(pattern = jsonSubDir, fileext = ".tmp")
+
+schema <- structType(structField("name", "string"),
+ structField("age", "integer"),
+ structField("count", "double"))
+
+test_that("read.stream, write.stream, awaitTermination, stopQuery", {
+  skip_on_cran()
+
+  df <- read.stream("json", path = jsonDir, schema = schema, 
maxFilesPerTrigger = 1)
+  expect_true(isStreaming(df))
+  counts <- count(group_by(df, "name"))
+  q <- write.stream(counts, "memory", queryName = "people", outputMode = 
"complete")
+
+  expect_false(awaitTermination(q, 5 * 1000))
+  callJMethod(q@ssq, "processAllAvailable")
+  expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 3)
+
+  writeLines(mockLinesNa, jsonPathNa)
+  awaitTermination(q, 5 * 1000)
+  callJMethod(q@ssq, "processAllAvailable")
+  expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 6)
+
+  stopQuery(q)
+  expect_true(awaitTermination(q, 1))
+  expect_error(awaitTermination(q), NA)
+})
+
+test_that("print from explain, lastProgress, status, isActive", {
+  skip_on_cran()
+
+  df <- read.stream("json", path = jsonDir, schema = schema)
+  expect_true(isStreaming(df))
+  counts <- count(group_by(df, "name"))
+  q <- write.stream(counts, "memory", queryName = "people2", outputMode = 
"complete")
+
+  awaitTermination(q, 5 * 1000)
+  callJMethod(q@ssq, "processAllAvailable")
+
+  expect_equal(capture.output(explain(q))[[1]], "== Physical Plan ==")
+  expect_true(any(grepl("\"description\" : \"MemorySink\"", 
capture.output(lastProgress(q)
+  expect_true(any(grepl("\"isTriggerActive\" : ", capture.output(status(q)
+
+  expect_equal(queryName(q), "people2")
+  expect_true(isActive(q))
+
+  stopQuery(q)
+})
+
+test_that("Stream other format", {
+  skip_on_cran()
+
+  parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
+  df <- read.df(jsonPath, "json", schema)
+  write.df(df, parquetPath, "parquet", "overwrite")
+
+  df <- read.stream(path = parquetPath, schema = schema)
+  expect_true(isStreaming(df))
+  counts <- count(group_by(df, "name"))
+  q <- write.stream(counts, "memory", queryName = "people3", outputMode = 
"complete")
+
+  expect_false(awaitTermination(q, 5 * 1000))
+  callJMethod(q@ssq, "processAllAvailable")
+  expect_equal(head(sql("SELECT count(*) FROM people3"))[[1]], 3)
+
+  expect_equal(queryName(q), "people3")
+  expect_true(any(grepl("\"description\" : 
\"FileStreamSource[[:print:]]+parquet",
+  capture.output(lastProgress(q)
+  expect_true(isActive(q))
+
+  stopQuery(q)
+  expect_true(awaitTermination(q, 1))
+  expect_false(isActive(q))
+
+  unlink(parquetPath)
+})
+
+test_that("Non-streaming DataFrame", {
+  skip_on_cran()
+
+  c <- as.DataFrame(cars)
+