Repository: spark Updated Branches: refs/heads/master 54e61df26 -> 5c165596d
[SPARK-19654][SPARKR][SS] Structured Streaming API for R ## What changes were proposed in this pull request? Add "experimental" API for SS in R ## How was this patch tested? manual, unit tests Author: Felix Cheung <felixcheun...@hotmail.com> Closes #16982 from felixcheung/rss. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5c165596 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5c165596 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5c165596 Branch: refs/heads/master Commit: 5c165596dac136b9b3a88cfb3578b2423d227eb7 Parents: 54e61df Author: Felix Cheung <felixcheun...@hotmail.com> Authored: Sat Mar 18 16:26:48 2017 -0700 Committer: Felix Cheung <felixche...@apache.org> Committed: Sat Mar 18 16:26:48 2017 -0700 ---------------------------------------------------------------------- R/pkg/DESCRIPTION | 1 + R/pkg/NAMESPACE | 13 ++ R/pkg/R/DataFrame.R | 104 +++++++++++- R/pkg/R/SQLContext.R | 50 ++++++ R/pkg/R/generics.R | 41 ++++- R/pkg/R/streaming.R | 208 ++++++++++++++++++++++++ R/pkg/R/utils.R | 11 +- R/pkg/inst/tests/testthat/test_streaming.R | 150 +++++++++++++++++ 8 files changed, 573 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5c165596/R/pkg/DESCRIPTION ---------------------------------------------------------------------- diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION index cc471ed..1635f71 100644 --- a/R/pkg/DESCRIPTION +++ b/R/pkg/DESCRIPTION @@ -54,5 +54,6 @@ Collate: 'types.R' 'utils.R' 'window.R' + 'streaming.R' RoxygenNote: 5.0.1 VignetteBuilder: knitr http://git-wip-us.apache.org/repos/asf/spark/blob/5c165596/R/pkg/NAMESPACE ---------------------------------------------------------------------- diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 871f8e4..78344ce 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -121,6 +121,7 @@ exportMethods("arrange", "insertInto", "intersect", "isLocal", + "isStreaming", "join", "limit", "merge", @@ -169,6 +170,7 @@ exportMethods("arrange", "write.json", "write.orc", "write.parquet", + "write.stream", "write.text", "write.ml") @@ -365,6 +367,7 @@ export("as.DataFrame", "read.json", "read.orc", "read.parquet", + "read.stream", "read.text", "spark.lapply", "spark.addFile", @@ -402,6 +405,16 @@ export("partitionBy", export("windowPartitionBy", "windowOrderBy") +exportClasses("StreamingQuery") + +export("awaitTermination", + "isActive", + "lastProgress", + "queryName", + "status", + "stopQuery") + + S3method(print, jobj) S3method(print, structField) S3method(print, structType) http://git-wip-us.apache.org/repos/asf/spark/blob/5c165596/R/pkg/R/DataFrame.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 97e0c9e..bc81633 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -133,9 +133,6 @@ setMethod("schema", #' #' Print the logical and physical Catalyst plans to the console for debugging. #' -#' @param x a SparkDataFrame. -#' @param extended Logical. If extended is FALSE, explain() only prints the physical plan. -#' @param ... further arguments to be passed to or from other methods. #' @family SparkDataFrame functions #' @aliases explain,SparkDataFrame-method #' @rdname explain @@ -3515,3 +3512,104 @@ setMethod("getNumPartitions", function(x) { callJMethod(callJMethod(x@sdf, "rdd"), "getNumPartitions") }) + +#' isStreaming +#' +#' Returns TRUE if this SparkDataFrame contains one or more sources that continuously return data +#' as it arrives. +#' +#' @param x A SparkDataFrame +#' @return TRUE if this SparkDataFrame is from a streaming source +#' @family SparkDataFrame functions +#' @aliases isStreaming,SparkDataFrame-method +#' @rdname isStreaming +#' @name isStreaming +#' @seealso \link{read.stream} \link{write.stream} +#' @export +#' @examples +#'\dontrun{ +#' sparkR.session() +#' df <- read.stream("socket", host = "localhost", port = 9999) +#' isStreaming(df) +#' } +#' @note isStreaming since 2.2.0 +#' @note experimental +setMethod("isStreaming", + signature(x = "SparkDataFrame"), + function(x) { + callJMethod(x@sdf, "isStreaming") + }) + +#' Write the streaming SparkDataFrame to a data source. +#' +#' The data source is specified by the \code{source} and a set of options (...). +#' If \code{source} is not specified, the default data source configured by +#' spark.sql.sources.default will be used. +#' +#' Additionally, \code{outputMode} specifies how data of a streaming SparkDataFrame is written to a +#' output data source. There are three modes: +#' \itemize{ +#' \item append: Only the new rows in the streaming SparkDataFrame will be written out. This +#' output mode can be only be used in queries that do not contain any aggregation. +#' \item complete: All the rows in the streaming SparkDataFrame will be written out every time +#' there are some updates. This output mode can only be used in queries that +#' contain aggregations. +#' \item update: Only the rows that were updated in the streaming SparkDataFrame will be written +#' out every time there are some updates. If the query doesn't contain aggregations, +#' it will be equivalent to \code{append} mode. +#' } +#' +#' @param df a streaming SparkDataFrame. +#' @param source a name for external data source. +#' @param outputMode one of 'append', 'complete', 'update'. +#' @param ... additional argument(s) passed to the method. +#' +#' @family SparkDataFrame functions +#' @seealso \link{read.stream} +#' @aliases write.stream,SparkDataFrame-method +#' @rdname write.stream +#' @name write.stream +#' @export +#' @examples +#'\dontrun{ +#' sparkR.session() +#' df <- read.stream("socket", host = "localhost", port = 9999) +#' isStreaming(df) +#' wordCounts <- count(group_by(df, "value")) +#' +#' # console +#' q <- write.stream(wordCounts, "console", outputMode = "complete") +#' # text stream +#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp") +#' # memory stream +#' q <- write.stream(wordCounts, "memory", queryName = "outs", outputMode = "complete") +#' head(sql("SELECT * from outs")) +#' queryName(q) +#' +#' stopQuery(q) +#' } +#' @note write.stream since 2.2.0 +#' @note experimental +setMethod("write.stream", + signature(df = "SparkDataFrame"), + function(df, source = NULL, outputMode = NULL, ...) { + if (!is.null(source) && !is.character(source)) { + stop("source should be character, NULL or omitted. It is the data source specified ", + "in 'spark.sql.sources.default' configuration by default.") + } + if (!is.null(outputMode) && !is.character(outputMode)) { + stop("outputMode should be charactor or omitted.") + } + if (is.null(source)) { + source <- getDefaultSqlSource() + } + options <- varargsToStrEnv(...) + write <- handledCallJMethod(df@sdf, "writeStream") + write <- callJMethod(write, "format", source) + if (!is.null(outputMode)) { + write <- callJMethod(write, "outputMode", outputMode) + } + write <- callJMethod(write, "options", options) + ssq <- handledCallJMethod(write, "start") + streamingQuery(ssq) + }) http://git-wip-us.apache.org/repos/asf/spark/blob/5c165596/R/pkg/R/SQLContext.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R index 8354f70..b75fb01 100644 --- a/R/pkg/R/SQLContext.R +++ b/R/pkg/R/SQLContext.R @@ -937,3 +937,53 @@ read.jdbc <- function(url, tableName, } dataFrame(sdf) } + +#' Load a streaming SparkDataFrame +#' +#' Returns the dataset in a data source as a SparkDataFrame +#' +#' The data source is specified by the \code{source} and a set of options(...). +#' If \code{source} is not specified, the default data source configured by +#' "spark.sql.sources.default" will be used. +#' +#' @param source The name of external data source +#' @param schema The data schema defined in structType, this is required for file-based streaming +#' data source +#' @param ... additional external data source specific named options, for instance \code{path} for +#' file-based streaming data source +#' @return SparkDataFrame +#' @rdname read.stream +#' @name read.stream +#' @seealso \link{write.stream} +#' @export +#' @examples +#'\dontrun{ +#' sparkR.session() +#' df <- read.stream("socket", host = "localhost", port = 9999) +#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp") +#' +#' df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1) +#' } +#' @name read.stream +#' @note read.stream since 2.2.0 +#' @note experimental +read.stream <- function(source = NULL, schema = NULL, ...) { + sparkSession <- getSparkSession() + if (!is.null(source) && !is.character(source)) { + stop("source should be character, NULL or omitted. It is the data source specified ", + "in 'spark.sql.sources.default' configuration by default.") + } + if (is.null(source)) { + source <- getDefaultSqlSource() + } + options <- varargsToStrEnv(...) + read <- callJMethod(sparkSession, "readStream") + read <- callJMethod(read, "format", source) + if (!is.null(schema)) { + stopifnot(class(schema) == "structType") + read <- callJMethod(read, "schema", schema$jobj) + } + read <- callJMethod(read, "options", options) + sdf <- handledCallJMethod(read, "load") + dataFrame(callJMethod(sdf, "toDF")) +} http://git-wip-us.apache.org/repos/asf/spark/blob/5c165596/R/pkg/R/generics.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 45bc127..0297712 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -539,6 +539,9 @@ setGeneric("dtypes", function(x) { standardGeneric("dtypes") }) #' @rdname explain #' @export +#' @param x a SparkDataFrame or a StreamingQuery. +#' @param extended Logical. If extended is FALSE, prints only the physical plan. +#' @param ... further arguments to be passed to or from other methods. setGeneric("explain", function(x, ...) { standardGeneric("explain") }) #' @rdname except @@ -577,6 +580,10 @@ setGeneric("intersect", function(x, y) { standardGeneric("intersect") }) #' @export setGeneric("isLocal", function(x) { standardGeneric("isLocal") }) +#' @rdname isStreaming +#' @export +setGeneric("isStreaming", function(x) { standardGeneric("isStreaming") }) + #' @rdname limit #' @export setGeneric("limit", function(x, num) {standardGeneric("limit") }) @@ -682,6 +689,12 @@ setGeneric("write.parquet", function(x, path, ...) { #' @export setGeneric("saveAsParquetFile", function(x, path) { standardGeneric("saveAsParquetFile") }) +#' @rdname write.stream +#' @export +setGeneric("write.stream", function(df, source = NULL, outputMode = NULL, ...) { + standardGeneric("write.stream") +}) + #' @rdname write.text #' @export setGeneric("write.text", function(x, path, ...) { standardGeneric("write.text") }) @@ -1428,10 +1441,36 @@ setGeneric("spark.posterior", function(object, newData) { standardGeneric("spark #' @export setGeneric("spark.perplexity", function(object, data) { standardGeneric("spark.perplexity") }) - #' @param object a fitted ML model object. #' @param path the directory where the model is saved. #' @param ... additional argument(s) passed to the method. #' @rdname write.ml #' @export setGeneric("write.ml", function(object, path, ...) { standardGeneric("write.ml") }) + + +###################### Streaming Methods ########################## + +#' @rdname awaitTermination +#' @export +setGeneric("awaitTermination", function(x, timeout) { standardGeneric("awaitTermination") }) + +#' @rdname isActive +#' @export +setGeneric("isActive", function(x) { standardGeneric("isActive") }) + +#' @rdname lastProgress +#' @export +setGeneric("lastProgress", function(x) { standardGeneric("lastProgress") }) + +#' @rdname queryName +#' @export +setGeneric("queryName", function(x) { standardGeneric("queryName") }) + +#' @rdname status +#' @export +setGeneric("status", function(x) { standardGeneric("status") }) + +#' @rdname stopQuery +#' @export +setGeneric("stopQuery", function(x) { standardGeneric("stopQuery") }) http://git-wip-us.apache.org/repos/asf/spark/blob/5c165596/R/pkg/R/streaming.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/streaming.R b/R/pkg/R/streaming.R new file mode 100644 index 0000000..e353d2d --- /dev/null +++ b/R/pkg/R/streaming.R @@ -0,0 +1,208 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# streaming.R - Structured Streaming / StreamingQuery class and methods implemented in S4 OO classes + +#' @include generics.R jobj.R +NULL + +#' S4 class that represents a StreamingQuery +#' +#' StreamingQuery can be created by using read.stream() and write.stream() +#' +#' @rdname StreamingQuery +#' @seealso \link{read.stream} +#' +#' @param ssq A Java object reference to the backing Scala StreamingQuery +#' @export +#' @note StreamingQuery since 2.2.0 +#' @note experimental +setClass("StreamingQuery", + slots = list(ssq = "jobj")) + +setMethod("initialize", "StreamingQuery", function(.Object, ssq) { + .Object@ssq <- ssq + .Object +}) + +streamingQuery <- function(ssq) { + stopifnot(class(ssq) == "jobj") + new("StreamingQuery", ssq) +} + +#' @rdname show +#' @export +#' @note show(StreamingQuery) since 2.2.0 +setMethod("show", "StreamingQuery", + function(object) { + name <- callJMethod(object@ssq, "name") + if (!is.null(name)) { + cat(paste0("StreamingQuery '", name, "'\n")) + } else { + cat("StreamingQuery", "\n") + } + }) + +#' queryName +#' +#' Returns the user-specified name of the query. This is specified in +#' \code{write.stream(df, queryName = "query")}. This name, if set, must be unique across all active +#' queries. +#' +#' @param x a StreamingQuery. +#' @return The name of the query, or NULL if not specified. +#' @rdname queryName +#' @name queryName +#' @aliases queryName,StreamingQuery-method +#' @family StreamingQuery methods +#' @seealso \link{write.stream} +#' @export +#' @examples +#' \dontrun{ queryName(sq) } +#' @note queryName(StreamingQuery) since 2.2.0 +#' @note experimental +setMethod("queryName", + signature(x = "StreamingQuery"), + function(x) { + callJMethod(x@ssq, "name") + }) + +#' @rdname explain +#' @name explain +#' @aliases explain,StreamingQuery-method +#' @family StreamingQuery methods +#' @export +#' @examples +#' \dontrun{ explain(sq) } +#' @note explain(StreamingQuery) since 2.2.0 +setMethod("explain", + signature(x = "StreamingQuery"), + function(x, extended = FALSE) { + cat(callJMethod(x@ssq, "explainInternal", extended), "\n") + }) + +#' lastProgress +#' +#' Prints the most recent progess update of this streaming query in JSON format. +#' +#' @param x a StreamingQuery. +#' @rdname lastProgress +#' @name lastProgress +#' @aliases lastProgress,StreamingQuery-method +#' @family StreamingQuery methods +#' @export +#' @examples +#' \dontrun{ lastProgress(sq) } +#' @note lastProgress(StreamingQuery) since 2.2.0 +#' @note experimental +setMethod("lastProgress", + signature(x = "StreamingQuery"), + function(x) { + p <- callJMethod(x@ssq, "lastProgress") + if (is.null(p)) { + cat("Streaming query has no progress") + } else { + cat(callJMethod(p, "toString"), "\n") + } + }) + +#' status +#' +#' Prints the current status of the query in JSON format. +#' +#' @param x a StreamingQuery. +#' @rdname status +#' @name status +#' @aliases status,StreamingQuery-method +#' @family StreamingQuery methods +#' @export +#' @examples +#' \dontrun{ status(sq) } +#' @note status(StreamingQuery) since 2.2.0 +#' @note experimental +setMethod("status", + signature(x = "StreamingQuery"), + function(x) { + cat(callJMethod(callJMethod(x@ssq, "status"), "toString"), "\n") + }) + +#' isActive +#' +#' Returns TRUE if this query is actively running. +#' +#' @param x a StreamingQuery. +#' @return TRUE if query is actively running, FALSE if stopped. +#' @rdname isActive +#' @name isActive +#' @aliases isActive,StreamingQuery-method +#' @family StreamingQuery methods +#' @export +#' @examples +#' \dontrun{ isActive(sq) } +#' @note isActive(StreamingQuery) since 2.2.0 +#' @note experimental +setMethod("isActive", + signature(x = "StreamingQuery"), + function(x) { + callJMethod(x@ssq, "isActive") + }) + +#' awaitTermination +#' +#' Waits for the termination of the query, either by \code{stopQuery} or by an error. +#' +#' If the query has terminated, then all subsequent calls to this method will return TRUE +#' immediately. +#' +#' @param x a StreamingQuery. +#' @param timeout time to wait in milliseconds +#' @return TRUE if query has terminated within the timeout period. +#' @rdname awaitTermination +#' @name awaitTermination +#' @aliases awaitTermination,StreamingQuery-method +#' @family StreamingQuery methods +#' @export +#' @examples +#' \dontrun{ awaitTermination(sq, 10000) } +#' @note awaitTermination(StreamingQuery) since 2.2.0 +#' @note experimental +setMethod("awaitTermination", + signature(x = "StreamingQuery"), + function(x, timeout) { + handledCallJMethod(x@ssq, "awaitTermination", as.integer(timeout)) + }) + +#' stopQuery +#' +#' Stops the execution of this query if it is running. This method blocks until the execution is +#' stopped. +#' +#' @param x a StreamingQuery. +#' @rdname stopQuery +#' @name stopQuery +#' @aliases stopQuery,StreamingQuery-method +#' @family StreamingQuery methods +#' @export +#' @examples +#' \dontrun{ stopQuery(sq) } +#' @note stopQuery(StreamingQuery) since 2.2.0 +#' @note experimental +setMethod("stopQuery", + signature(x = "StreamingQuery"), + function(x) { + invisible(callJMethod(x@ssq, "stop")) + }) http://git-wip-us.apache.org/repos/asf/spark/blob/5c165596/R/pkg/R/utils.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/utils.R b/R/pkg/R/utils.R index 1f7848f..810de99 100644 --- a/R/pkg/R/utils.R +++ b/R/pkg/R/utils.R @@ -823,7 +823,16 @@ captureJVMException <- function(e, method) { stacktrace <- rawmsg } - if (any(grep("java.lang.IllegalArgumentException: ", stacktrace))) { + # StreamingQueryException could wrap an IllegalArgumentException, so look for that first + if (any(grep("org.apache.spark.sql.streaming.StreamingQueryException: ", stacktrace))) { + msg <- strsplit(stacktrace, "org.apache.spark.sql.streaming.StreamingQueryException: ", + fixed = TRUE)[[1]] + # Extract "Error in ..." message. + rmsg <- msg[1] + # Extract the first message of JVM exception. + first <- strsplit(msg[2], "\r?\n\tat")[[1]][1] + stop(paste0(rmsg, "streaming query error - ", first), call. = FALSE) + } else if (any(grep("java.lang.IllegalArgumentException: ", stacktrace))) { msg <- strsplit(stacktrace, "java.lang.IllegalArgumentException: ", fixed = TRUE)[[1]] # Extract "Error in ..." message. rmsg <- msg[1] http://git-wip-us.apache.org/repos/asf/spark/blob/5c165596/R/pkg/inst/tests/testthat/test_streaming.R ---------------------------------------------------------------------- diff --git a/R/pkg/inst/tests/testthat/test_streaming.R b/R/pkg/inst/tests/testthat/test_streaming.R new file mode 100644 index 0000000..03b1bd3 --- /dev/null +++ b/R/pkg/inst/tests/testthat/test_streaming.R @@ -0,0 +1,150 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +library(testthat) + +context("Structured Streaming") + +# Tests for Structured Streaming functions in SparkR + +sparkSession <- sparkR.session(enableHiveSupport = FALSE) + +jsonSubDir <- file.path("sparkr-test", "json", "") +if (.Platform$OS.type == "windows") { + # file.path removes the empty separator on Windows, adds it back + jsonSubDir <- paste0(jsonSubDir, .Platform$file.sep) +} +jsonDir <- file.path(tempdir(), jsonSubDir) +dir.create(jsonDir, recursive = TRUE) + +mockLines <- c("{\"name\":\"Michael\"}", + "{\"name\":\"Andy\", \"age\":30}", + "{\"name\":\"Justin\", \"age\":19}") +jsonPath <- tempfile(pattern = jsonSubDir, fileext = ".tmp") +writeLines(mockLines, jsonPath) + +mockLinesNa <- c("{\"name\":\"Bob\",\"age\":16,\"height\":176.5}", + "{\"name\":\"Alice\",\"age\":null,\"height\":164.3}", + "{\"name\":\"David\",\"age\":60,\"height\":null}") +jsonPathNa <- tempfile(pattern = jsonSubDir, fileext = ".tmp") + +schema <- structType(structField("name", "string"), + structField("age", "integer"), + structField("count", "double")) + +test_that("read.stream, write.stream, awaitTermination, stopQuery", { + df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1) + expect_true(isStreaming(df)) + counts <- count(group_by(df, "name")) + q <- write.stream(counts, "memory", queryName = "people", outputMode = "complete") + + expect_false(awaitTermination(q, 5 * 1000)) + expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 3) + + writeLines(mockLinesNa, jsonPathNa) + awaitTermination(q, 5 * 1000) + expect_equal(head(sql("SELECT count(*) FROM people"))[[1]], 6) + + stopQuery(q) + expect_true(awaitTermination(q, 1)) +}) + +test_that("print from explain, lastProgress, status, isActive", { + df <- read.stream("json", path = jsonDir, schema = schema) + expect_true(isStreaming(df)) + counts <- count(group_by(df, "name")) + q <- write.stream(counts, "memory", queryName = "people2", outputMode = "complete") + + awaitTermination(q, 5 * 1000) + + expect_equal(capture.output(explain(q))[[1]], "== Physical Plan ==") + expect_true(any(grepl("\"description\" : \"MemorySink\"", capture.output(lastProgress(q))))) + expect_true(any(grepl("\"isTriggerActive\" : ", capture.output(status(q))))) + + expect_equal(queryName(q), "people2") + expect_true(isActive(q)) + + stopQuery(q) +}) + +test_that("Stream other format", { + parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet") + df <- read.df(jsonPath, "json", schema) + write.df(df, parquetPath, "parquet", "overwrite") + + df <- read.stream(path = parquetPath, schema = schema) + expect_true(isStreaming(df)) + counts <- count(group_by(df, "name")) + q <- write.stream(counts, "memory", queryName = "people3", outputMode = "complete") + + expect_false(awaitTermination(q, 5 * 1000)) + expect_equal(head(sql("SELECT count(*) FROM people3"))[[1]], 3) + + expect_equal(queryName(q), "people3") + expect_true(any(grepl("\"description\" : \"FileStreamSource[[:print:]]+parquet", + capture.output(lastProgress(q))))) + expect_true(isActive(q)) + + stopQuery(q) + expect_true(awaitTermination(q, 1)) + expect_false(isActive(q)) + + unlink(parquetPath) +}) + +test_that("Non-streaming DataFrame", { + c <- as.DataFrame(cars) + expect_false(isStreaming(c)) + + expect_error(write.stream(c, "memory", queryName = "people", outputMode = "complete"), + paste0(".*(writeStream : analysis error - 'writeStream' can be called only on ", + "streaming Dataset/DataFrame).*")) +}) + +test_that("Unsupported operation", { + # memory sink without aggregation + df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1) + expect_error(write.stream(df, "memory", queryName = "people", outputMode = "complete"), + paste0(".*(start : analysis error - Complete output mode not supported when there ", + "are no streaming aggregations on streaming DataFrames/Datasets).*")) +}) + +test_that("Terminated by error", { + df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = -1) + counts <- count(group_by(df, "name")) + # This would not fail before returning with a StreamingQuery, + # but could dump error log at just about the same time + expect_error(q <- write.stream(counts, "memory", queryName = "people4", outputMode = "complete"), + NA) + + expect_error(awaitTermination(q, 1), + paste0(".*(awaitTermination : streaming query error - Invalid value '-1' for option", + " 'maxFilesPerTrigger', must be a positive integer).*")) + + expect_true(any(grepl("\"message\" : \"Terminated with exception: Invalid value", + capture.output(status(q))))) + expect_true(any(grepl("Streaming query has no progress", capture.output(lastProgress(q))))) + expect_equal(queryName(q), "people4") + expect_false(isActive(q)) + + stopQuery(q) +}) + +unlink(jsonPath) +unlink(jsonPathNa) + +sparkR.session.stop() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org