Repository: spark
Updated Branches:
  refs/heads/master 54e61df26 -> 5c165596d


[SPARK-19654][SPARKR][SS] Structured Streaming API for R

## What changes were proposed in this pull request?

Add "experimental" API for SS in R

## How was this patch tested?

manual, unit tests

Author: Felix Cheung <felixcheun...@hotmail.com>

Closes #16982 from felixcheung/rss.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c165596
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c165596
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c165596

Branch: refs/heads/master
Commit: 5c165596dac136b9b3a88cfb3578b2423d227eb7
Parents: 54e61df
Author: Felix Cheung <felixcheun...@hotmail.com>
Authored: Sat Mar 18 16:26:48 2017 -0700
Committer: Felix Cheung <felixche...@apache.org>
Committed: Sat Mar 18 16:26:48 2017 -0700

----------------------------------------------------------------------
 R/pkg/DESCRIPTION                          |   1 +
 R/pkg/NAMESPACE                            |  13 ++
 R/pkg/R/DataFrame.R                        | 104 +++++++++++-
 R/pkg/R/SQLContext.R                       |  50 ++++++
 R/pkg/R/generics.R                         |  41 ++++-
 R/pkg/R/streaming.R                        | 208 ++++++++++++++++++++++++
 R/pkg/R/utils.R                            |  11 +-
 R/pkg/inst/tests/testthat/test_streaming.R | 150 +++++++++++++++++
 8 files changed, 573 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5c165596/R/pkg/DESCRIPTION
----------------------------------------------------------------------
diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION
index cc471ed..1635f71 100644
--- a/R/pkg/DESCRIPTION
+++ b/R/pkg/DESCRIPTION
@@ -54,5 +54,6 @@ Collate:
     'types.R'
     'utils.R'
     'window.R'
+    'streaming.R'
 RoxygenNote: 5.0.1
 VignetteBuilder: knitr

http://git-wip-us.apache.org/repos/asf/spark/blob/5c165596/R/pkg/NAMESPACE
----------------------------------------------------------------------
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index 871f8e4..78344ce 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -121,6 +121,7 @@ exportMethods("arrange",
               "insertInto",
               "intersect",
               "isLocal",
+              "isStreaming",
               "join",
               "limit",
               "merge",
@@ -169,6 +170,7 @@ exportMethods("arrange",
               "write.json",
               "write.orc",
               "write.parquet",
+              "write.stream",
               "write.text",
               "write.ml")
 
@@ -365,6 +367,7 @@ export("as.DataFrame",
        "read.json",
        "read.orc",
        "read.parquet",
+       "read.stream",
        "read.text",
        "spark.lapply",
        "spark.addFile",
@@ -402,6 +405,16 @@ export("partitionBy",
 export("windowPartitionBy",
        "windowOrderBy")
 
+exportClasses("StreamingQuery")
+
+export("awaitTermination",
+       "isActive",
+       "lastProgress",
+       "queryName",
+       "status",
+       "stopQuery")
+
+
 S3method(print, jobj)
 S3method(print, structField)
 S3method(print, structType)

http://git-wip-us.apache.org/repos/asf/spark/blob/5c165596/R/pkg/R/DataFrame.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R
index 97e0c9e..bc81633 100644
--- a/R/pkg/R/DataFrame.R
+++ b/R/pkg/R/DataFrame.R
@@ -133,9 +133,6 @@ setMethod("schema",
 #'
 #' Print the logical and physical Catalyst plans to the console for debugging.
 #'
-#' @param x a SparkDataFrame.
-#' @param extended Logical. If extended is FALSE, explain() only prints the 
physical plan.
-#' @param ... further arguments to be passed to or from other methods.
 #' @family SparkDataFrame functions
 #' @aliases explain,SparkDataFrame-method
 #' @rdname explain
@@ -3515,3 +3512,104 @@ setMethod("getNumPartitions",
           function(x) {
             callJMethod(callJMethod(x@sdf, "rdd"), "getNumPartitions")
           })
+
+#' isStreaming
+#'
+#' Returns TRUE if this SparkDataFrame contains one or more sources that 
continuously return data
+#' as it arrives.
+#'
+#' @param x A SparkDataFrame
+#' @return TRUE if this SparkDataFrame is from a streaming source
+#' @family SparkDataFrame functions
+#' @aliases isStreaming,SparkDataFrame-method
+#' @rdname isStreaming
+#' @name isStreaming
+#' @seealso \link{read.stream} \link{write.stream}
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' df <- read.stream("socket", host = "localhost", port = 9999)
+#' isStreaming(df)
+#' }
+#' @note isStreaming since 2.2.0
+#' @note experimental
+setMethod("isStreaming",
+          signature(x = "SparkDataFrame"),
+          function(x) {
+            callJMethod(x@sdf, "isStreaming")
+          })
+
+#' Write the streaming SparkDataFrame to a data source.
+#'
+#' The data source is specified by the \code{source} and a set of options 
(...).
+#' If \code{source} is not specified, the default data source configured by
+#' spark.sql.sources.default will be used.
+#'
+#' Additionally, \code{outputMode} specifies how data of a streaming 
SparkDataFrame is written to a
+#' output data source. There are three modes:
+#' \itemize{
+#'   \item append: Only the new rows in the streaming SparkDataFrame will be 
written out. This
+#'                 output mode can be only be used in queries that do not 
contain any aggregation.
+#'   \item complete: All the rows in the streaming SparkDataFrame will be 
written out every time
+#'                   there are some updates. This output mode can only be used 
in queries that
+#'                   contain aggregations.
+#'   \item update: Only the rows that were updated in the streaming 
SparkDataFrame will be written
+#'                 out every time there are some updates. If the query doesn't 
contain aggregations,
+#'                 it will be equivalent to \code{append} mode.
+#' }
+#'
+#' @param df a streaming SparkDataFrame.
+#' @param source a name for external data source.
+#' @param outputMode one of 'append', 'complete', 'update'.
+#' @param ... additional argument(s) passed to the method.
+#'
+#' @family SparkDataFrame functions
+#' @seealso \link{read.stream}
+#' @aliases write.stream,SparkDataFrame-method
+#' @rdname write.stream
+#' @name write.stream
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' df <- read.stream("socket", host = "localhost", port = 9999)
+#' isStreaming(df)
+#' wordCounts <- count(group_by(df, "value"))
+#'
+#' # console
+#' q <- write.stream(wordCounts, "console", outputMode = "complete")
+#' # text stream
+#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = 
"/home/user/cp")
+#' # memory stream
+#' q <- write.stream(wordCounts, "memory", queryName = "outs", outputMode = 
"complete")
+#' head(sql("SELECT * from outs"))
+#' queryName(q)
+#'
+#' stopQuery(q)
+#' }
+#' @note write.stream since 2.2.0
+#' @note experimental
+setMethod("write.stream",
+          signature(df = "SparkDataFrame"),
+          function(df, source = NULL, outputMode = NULL, ...) {
+            if (!is.null(source) && !is.character(source)) {
+              stop("source should be character, NULL or omitted. It is the 
data source specified ",
+                   "in 'spark.sql.sources.default' configuration by default.")
+            }
+            if (!is.null(outputMode) && !is.character(outputMode)) {
+              stop("outputMode should be charactor or omitted.")
+            }
+            if (is.null(source)) {
+              source <- getDefaultSqlSource()
+            }
+            options <- varargsToStrEnv(...)
+            write <- handledCallJMethod(df@sdf, "writeStream")
+            write <- callJMethod(write, "format", source)
+            if (!is.null(outputMode)) {
+              write <- callJMethod(write, "outputMode", outputMode)
+            }
+            write <- callJMethod(write, "options", options)
+            ssq <- handledCallJMethod(write, "start")
+            streamingQuery(ssq)
+          })

http://git-wip-us.apache.org/repos/asf/spark/blob/5c165596/R/pkg/R/SQLContext.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index 8354f70..b75fb01 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -937,3 +937,53 @@ read.jdbc <- function(url, tableName,
   }
   dataFrame(sdf)
 }
+
+#' Load a streaming SparkDataFrame
+#'
+#' Returns the dataset in a data source as a SparkDataFrame
+#'
+#' The data source is specified by the \code{source} and a set of options(...).
+#' If \code{source} is not specified, the default data source configured by
+#' "spark.sql.sources.default" will be used.
+#'
+#' @param source The name of external data source
+#' @param schema The data schema defined in structType, this is required for 
file-based streaming
+#'               data source
+#' @param ... additional external data source specific named options, for 
instance \code{path} for
+#'        file-based streaming data source
+#' @return SparkDataFrame
+#' @rdname read.stream
+#' @name read.stream
+#' @seealso \link{write.stream}
+#' @export
+#' @examples
+#'\dontrun{
+#' sparkR.session()
+#' df <- read.stream("socket", host = "localhost", port = 9999)
+#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = 
"/home/user/cp")
+#'
+#' df <- read.stream("json", path = jsonDir, schema = schema, 
maxFilesPerTrigger = 1)
+#' }
+#' @name read.stream
+#' @note read.stream since 2.2.0
+#' @note experimental
+read.stream <- function(source = NULL, schema = NULL, ...) {
+  sparkSession <- getSparkSession()
+  if (!is.null(source) && !is.character(source)) {
+    stop("source should be character, NULL or omitted. It is the data source 
specified ",
+         "in 'spark.sql.sources.default' configuration by default.")
+  }
+  if (is.null(source)) {
+    source <- getDefaultSqlSource()
+  }
+  options <- varargsToStrEnv(...)
+  read <- callJMethod(sparkSession, "readStream")
+  read <- callJMethod(read, "format", source)
+  if (!is.null(schema)) {
+    stopifnot(class(schema) == "structType")
+    read <- callJMethod(read, "schema", schema$jobj)
+  }
+  read <- callJMethod(read, "options", options)
+  sdf <- handledCallJMethod(read, "load")
+  dataFrame(callJMethod(sdf, "toDF"))
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/5c165596/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 45bc127..0297712 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -539,6 +539,9 @@ setGeneric("dtypes", function(x) { 
standardGeneric("dtypes") })
 
 #' @rdname explain
 #' @export
+#' @param x a SparkDataFrame or a StreamingQuery.
+#' @param extended Logical. If extended is FALSE, prints only the physical 
plan.
+#' @param ... further arguments to be passed to or from other methods.
 setGeneric("explain", function(x, ...) { standardGeneric("explain") })
 
 #' @rdname except
@@ -577,6 +580,10 @@ setGeneric("intersect", function(x, y) { 
standardGeneric("intersect") })
 #' @export
 setGeneric("isLocal", function(x) { standardGeneric("isLocal") })
 
+#' @rdname isStreaming
+#' @export
+setGeneric("isStreaming", function(x) { standardGeneric("isStreaming") })
+
 #' @rdname limit
 #' @export
 setGeneric("limit", function(x, num) {standardGeneric("limit") })
@@ -682,6 +689,12 @@ setGeneric("write.parquet", function(x, path, ...) {
 #' @export
 setGeneric("saveAsParquetFile", function(x, path) { 
standardGeneric("saveAsParquetFile") })
 
+#' @rdname write.stream
+#' @export
+setGeneric("write.stream", function(df, source = NULL, outputMode = NULL, ...) 
{
+  standardGeneric("write.stream")
+})
+
 #' @rdname write.text
 #' @export
 setGeneric("write.text", function(x, path, ...) { 
standardGeneric("write.text") })
@@ -1428,10 +1441,36 @@ setGeneric("spark.posterior", function(object, newData) 
{ standardGeneric("spark
 #' @export
 setGeneric("spark.perplexity", function(object, data) { 
standardGeneric("spark.perplexity") })
 
-
 #' @param object a fitted ML model object.
 #' @param path the directory where the model is saved.
 #' @param ... additional argument(s) passed to the method.
 #' @rdname write.ml
 #' @export
 setGeneric("write.ml", function(object, path, ...) { 
standardGeneric("write.ml") })
+
+
+###################### Streaming Methods ##########################
+
+#' @rdname awaitTermination
+#' @export
+setGeneric("awaitTermination", function(x, timeout) { 
standardGeneric("awaitTermination") })
+
+#' @rdname isActive
+#' @export
+setGeneric("isActive", function(x) { standardGeneric("isActive") })
+
+#' @rdname lastProgress
+#' @export
+setGeneric("lastProgress", function(x) { standardGeneric("lastProgress") })
+
+#' @rdname queryName
+#' @export
+setGeneric("queryName", function(x) { standardGeneric("queryName") })
+
+#' @rdname status
+#' @export
+setGeneric("status", function(x) { standardGeneric("status") })
+
+#' @rdname stopQuery
+#' @export
+setGeneric("stopQuery", function(x) { standardGeneric("stopQuery") })

http://git-wip-us.apache.org/repos/asf/spark/blob/5c165596/R/pkg/R/streaming.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/streaming.R b/R/pkg/R/streaming.R
new file mode 100644
index 0000000..e353d2d
--- /dev/null
+++ b/R/pkg/R/streaming.R
@@ -0,0 +1,208 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# streaming.R - Structured Streaming / StreamingQuery class and methods 
implemented in S4 OO classes
+
+#' @include generics.R jobj.R
+NULL
+
+#' S4 class that represents a StreamingQuery
+#'
+#' StreamingQuery can be created by using read.stream() and write.stream()
+#'
+#' @rdname StreamingQuery
+#' @seealso \link{read.stream}
+#'
+#' @param ssq A Java object reference to the backing Scala StreamingQuery
+#' @export
+#' @note StreamingQuery since 2.2.0
+#' @note experimental
+setClass("StreamingQuery",
+         slots = list(ssq = "jobj"))
+
+setMethod("initialize", "StreamingQuery", function(.Object, ssq) {
+  .Object@ssq <- ssq
+  .Object
+})
+
+streamingQuery <- function(ssq) {
+  stopifnot(class(ssq) == "jobj")
+  new("StreamingQuery", ssq)
+}
+
+#' @rdname show
+#' @export
+#' @note show(StreamingQuery) since 2.2.0
+setMethod("show", "StreamingQuery",
+          function(object) {
+            name <- callJMethod(object@ssq, "name")
+            if (!is.null(name)) {
+              cat(paste0("StreamingQuery '", name, "'\n"))
+            } else {
+              cat("StreamingQuery", "\n")
+            }
+          })
+
+#' queryName
+#'
+#' Returns the user-specified name of the query. This is specified in
+#' \code{write.stream(df, queryName = "query")}. This name, if set, must be 
unique across all active
+#' queries.
+#'
+#' @param x a StreamingQuery.
+#' @return The name of the query, or NULL if not specified.
+#' @rdname queryName
+#' @name queryName
+#' @aliases queryName,StreamingQuery-method
+#' @family StreamingQuery methods
+#' @seealso \link{write.stream}
+#' @export
+#' @examples
+#' \dontrun{ queryName(sq) }
+#' @note queryName(StreamingQuery) since 2.2.0
+#' @note experimental
+setMethod("queryName",
+          signature(x = "StreamingQuery"),
+          function(x) {
+            callJMethod(x@ssq, "name")
+          })
+
+#' @rdname explain
+#' @name explain
+#' @aliases explain,StreamingQuery-method
+#' @family StreamingQuery methods
+#' @export
+#' @examples
+#' \dontrun{ explain(sq) }
+#' @note explain(StreamingQuery) since 2.2.0
+setMethod("explain",
+          signature(x = "StreamingQuery"),
+          function(x, extended = FALSE) {
+            cat(callJMethod(x@ssq, "explainInternal", extended), "\n")
+          })
+
+#' lastProgress
+#'
+#' Prints the most recent progess update of this streaming query in JSON 
format.
+#'
+#' @param x a StreamingQuery.
+#' @rdname lastProgress
+#' @name lastProgress
+#' @aliases lastProgress,StreamingQuery-method
+#' @family StreamingQuery methods
+#' @export
+#' @examples
+#' \dontrun{ lastProgress(sq) }
+#' @note lastProgress(StreamingQuery) since 2.2.0
+#' @note experimental
+setMethod("lastProgress",
+          signature(x = "StreamingQuery"),
+          function(x) {
+            p <- callJMethod(x@ssq, "lastProgress")
+            if (is.null(p)) {
+              cat("Streaming query has no progress")
+            } else {
+              cat(callJMethod(p, "toString"), "\n")
+            }
+          })
+
+#' status
+#'
+#' Prints the current status of the query in JSON format.
+#'
+#' @param x a StreamingQuery.
+#' @rdname status
+#' @name status
+#' @aliases status,StreamingQuery-method
+#' @family StreamingQuery methods
+#' @export
+#' @examples
+#' \dontrun{ status(sq) }
+#' @note status(StreamingQuery) since 2.2.0
+#' @note experimental
+setMethod("status",
+          signature(x = "StreamingQuery"),
+          function(x) {
+            cat(callJMethod(callJMethod(x@ssq, "status"), "toString"), "\n")
+          })
+
+#' isActive
+#'
+#' Returns TRUE if this query is actively running.
+#'
+#' @param x a StreamingQuery.
+#' @return TRUE if query is actively running, FALSE if stopped.
+#' @rdname isActive
+#' @name isActive
+#' @aliases isActive,StreamingQuery-method
+#' @family StreamingQuery methods
+#' @export
+#' @examples
+#' \dontrun{ isActive(sq) }
+#' @note isActive(StreamingQuery) since 2.2.0
+#' @note experimental
+setMethod("isActive",
+          signature(x = "StreamingQuery"),
+          function(x) {
+            callJMethod(x@ssq, "isActive")
+          })
+
+#' awaitTermination
+#'
+#' Waits for the termination of the query, either by \code{stopQuery} or by an 
error.
+#'
+#' If the query has terminated, then all subsequent calls to this method will 
return TRUE
+#' immediately.
+#'
+#' @param x a StreamingQuery.
+#' @param timeout time to wait in milliseconds
+#' @return TRUE if query has terminated within the timeout period.
+#' @rdname awaitTermination
+#' @name awaitTermination
+#' @aliases awaitTermination,StreamingQuery-method
+#' @family StreamingQuery methods
+#' @export
+#' @examples
+#' \dontrun{ awaitTermination(sq, 10000) }
+#' @note awaitTermination(StreamingQuery) since 2.2.0
+#' @note experimental
+setMethod("awaitTermination",
+          signature(x = "StreamingQuery"),
+          function(x, timeout) {
+            handledCallJMethod(x@ssq, "awaitTermination", as.integer(timeout))
+          })
+
+#' stopQuery
+#'
+#' Stops the execution of this query if it is running. This method blocks 
until the execution is
+#' stopped.
+#'
+#' @param x a StreamingQuery.
+#' @rdname stopQuery
+#' @name stopQuery
+#' @aliases stopQuery,StreamingQuery-method
+#' @family StreamingQuery methods
+#' @export
+#' @examples
+#' \dontrun{ stopQuery(sq) }
+#' @note stopQuery(StreamingQuery) since 2.2.0
+#' @note experimental
+setMethod("stopQuery",
+          signature(x = "StreamingQuery"),
+          function(x) {
+            invisible(callJMethod(x@ssq, "stop"))
+          })

http://git-wip-us.apache.org/repos/asf/spark/blob/5c165596/R/pkg/R/utils.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R
index 1f7848f..810de99 100644
--- a/R/pkg/R/utils.R
+++ b/R/pkg/R/utils.R
@@ -823,7 +823,16 @@ captureJVMException <- function(e, method) {
     stacktrace <- rawmsg
   }
 
-  if (any(grep("java.lang.IllegalArgumentException: ", stacktrace))) {
+  # StreamingQueryException could wrap an IllegalArgumentException, so look 
for that first
+  if (any(grep("org.apache.spark.sql.streaming.StreamingQueryException: ", 
stacktrace))) {
+    msg <- strsplit(stacktrace, 
"org.apache.spark.sql.streaming.StreamingQueryException: ",
+                    fixed = TRUE)[[1]]
+    # Extract "Error in ..." message.
+    rmsg <- msg[1]
+    # Extract the first message of JVM exception.
+    first <- strsplit(msg[2], "\r?\n\tat")[[1]][1]
+    stop(paste0(rmsg, "streaming query error - ", first), call. = FALSE)
+  } else if (any(grep("java.lang.IllegalArgumentException: ", stacktrace))) {
     msg <- strsplit(stacktrace, "java.lang.IllegalArgumentException: ", fixed 
= TRUE)[[1]]
     # Extract "Error in ..." message.
     rmsg <- msg[1]

http://git-wip-us.apache.org/repos/asf/spark/blob/5c165596/R/pkg/inst/tests/testthat/test_streaming.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_streaming.R 
b/R/pkg/inst/tests/testthat/test_streaming.R
new file mode 100644
index 0000000..03b1bd3
--- /dev/null
+++ b/R/pkg/inst/tests/testthat/test_streaming.R
@@ -0,0 +1,150 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+library(testthat)
+
+context("Structured Streaming")
+
+# Tests for Structured Streaming functions in SparkR
+
+sparkSession <- sparkR.session(enableHiveSupport = FALSE)
+
+jsonSubDir <- file.path("sparkr-test", "json", "")
+if (.Platform$OS.type == "windows") {
+  # file.path removes the empty separator on Windows, adds it back
+  jsonSubDir <- paste0(jsonSubDir, .Platform$file.sep)
+}
+jsonDir <- file.path(tempdir(), jsonSubDir)
+dir.create(jsonDir, recursive = TRUE)
+
+mockLines <- c("{\"name\":\"Michael\"}",
+               "{\"name\":\"Andy\", \"age\":30}",
+               "{\"name\":\"Justin\", \"age\":19}")
+jsonPath <- tempfile(pattern = jsonSubDir, fileext = ".tmp")
+writeLines(mockLines, jsonPath)
+
+mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}",
+                 "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}",
+                 "{\"name\":\"David\",\"age\":60,\"height\":null}")
+jsonPathNa <- tempfile(pattern = jsonSubDir, fileext = ".tmp")
+
+schema <- structType(structField("name", "string"),
+                     structField("age", "integer"),
+                     structField("count", "double"))
+
+test_that("read.stream, write.stream, awaitTermination, stopQuery", {
+  df <- read.stream("json", path = jsonDir, schema = schema, 
maxFilesPerTrigger = 1)
+  expect_true(isStreaming(df))
+  counts <- count(group_by(df, "name"))
+  q <- write.stream(counts, "memory", queryName = "people", outputMode = 
"complete")
+
+  expect_false(awaitTermination(q, 5 * 1000))
+  expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 3)
+
+  writeLines(mockLinesNa, jsonPathNa)
+  awaitTermination(q, 5 * 1000)
+  expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 6)
+
+  stopQuery(q)
+  expect_true(awaitTermination(q, 1))
+})
+
+test_that("print from explain, lastProgress, status, isActive", {
+  df <- read.stream("json", path = jsonDir, schema = schema)
+  expect_true(isStreaming(df))
+  counts <- count(group_by(df, "name"))
+  q <- write.stream(counts, "memory", queryName = "people2", outputMode = 
"complete")
+
+  awaitTermination(q, 5 * 1000)
+
+  expect_equal(capture.output(explain(q))[[1]], "== Physical Plan ==")
+  expect_true(any(grepl("\"description\" : \"MemorySink\"", 
capture.output(lastProgress(q)))))
+  expect_true(any(grepl("\"isTriggerActive\" : ", capture.output(status(q)))))
+
+  expect_equal(queryName(q), "people2")
+  expect_true(isActive(q))
+
+  stopQuery(q)
+})
+
+test_that("Stream other format", {
+  parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
+  df <- read.df(jsonPath, "json", schema)
+  write.df(df, parquetPath, "parquet", "overwrite")
+
+  df <- read.stream(path = parquetPath, schema = schema)
+  expect_true(isStreaming(df))
+  counts <- count(group_by(df, "name"))
+  q <- write.stream(counts, "memory", queryName = "people3", outputMode = 
"complete")
+
+  expect_false(awaitTermination(q, 5 * 1000))
+  expect_equal(head(sql("SELECT count(*) FROM people3"))[[1]], 3)
+
+  expect_equal(queryName(q), "people3")
+  expect_true(any(grepl("\"description\" : 
\"FileStreamSource[[:print:]]+parquet",
+              capture.output(lastProgress(q)))))
+  expect_true(isActive(q))
+
+  stopQuery(q)
+  expect_true(awaitTermination(q, 1))
+  expect_false(isActive(q))
+
+  unlink(parquetPath)
+})
+
+test_that("Non-streaming DataFrame", {
+  c <- as.DataFrame(cars)
+  expect_false(isStreaming(c))
+
+  expect_error(write.stream(c, "memory", queryName = "people", outputMode = 
"complete"),
+               paste0(".*(writeStream : analysis error - 'writeStream' can be 
called only on ",
+                      "streaming Dataset/DataFrame).*"))
+})
+
+test_that("Unsupported operation", {
+  # memory sink without aggregation
+  df <- read.stream("json", path = jsonDir, schema = schema, 
maxFilesPerTrigger = 1)
+  expect_error(write.stream(df, "memory", queryName = "people", outputMode = 
"complete"),
+               paste0(".*(start : analysis error - Complete output mode not 
supported when there ",
+                      "are no streaming aggregations on streaming 
DataFrames/Datasets).*"))
+})
+
+test_that("Terminated by error", {
+  df <- read.stream("json", path = jsonDir, schema = schema, 
maxFilesPerTrigger = -1)
+  counts <- count(group_by(df, "name"))
+  # This would not fail before returning with a StreamingQuery,
+  # but could dump error log at just about the same time
+  expect_error(q <- write.stream(counts, "memory", queryName = "people4", 
outputMode = "complete"),
+               NA)
+
+  expect_error(awaitTermination(q, 1),
+               paste0(".*(awaitTermination : streaming query error - Invalid 
value '-1' for option",
+                      " 'maxFilesPerTrigger', must be a positive integer).*"))
+
+  expect_true(any(grepl("\"message\" : \"Terminated with exception: Invalid 
value",
+              capture.output(status(q)))))
+  expect_true(any(grepl("Streaming query has no progress", 
capture.output(lastProgress(q)))))
+  expect_equal(queryName(q), "people4")
+  expect_false(isActive(q))
+
+  stopQuery(q)
+})
+
+unlink(jsonPath)
+unlink(jsonPathNa)
+
+sparkR.session.stop()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to