Repository: spark
Updated Branches:
  refs/heads/branch-2.2 994d9da90 -> c890e938c


[SPARK-20541][SPARKR][SS] support awaitTermination without timeout

## What changes were proposed in this pull request?

Add without param for timeout - will need this to submit a job that runs until 
stopped
Need this for 2.2

## How was this patch tested?

manually, unit test

Author: Felix Cheung <felixcheun...@hotmail.com>

Closes #17815 from felixcheung/rssawaitinfinite.

(cherry picked from commit a355b667a3718d9c5d48a0781e836bf5418ab842)
Signed-off-by: Felix Cheung <felixche...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c890e938
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c890e938
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c890e938

Branch: refs/heads/branch-2.2
Commit: c890e938c520a9cefd9484e2324c891c9a1ec2ae
Parents: 994d9da
Author: Felix Cheung <felixcheun...@hotmail.com>
Authored: Sun Apr 30 23:23:49 2017 -0700
Committer: Felix Cheung <felixche...@apache.org>
Committed: Sun Apr 30 23:24:03 2017 -0700

----------------------------------------------------------------------
 R/pkg/R/generics.R                         |  2 +-
 R/pkg/R/streaming.R                        | 14 ++++++++++----
 R/pkg/inst/tests/testthat/test_streaming.R |  1 +
 3 files changed, 12 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c890e938/R/pkg/R/generics.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 945676c..b23abe6 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -1469,7 +1469,7 @@ setGeneric("write.ml", function(object, path, ...) { 
standardGeneric("write.ml")
 
 #' @rdname awaitTermination
 #' @export
-setGeneric("awaitTermination", function(x, timeout) { 
standardGeneric("awaitTermination") })
+setGeneric("awaitTermination", function(x, timeout = NULL) { 
standardGeneric("awaitTermination") })
 
 #' @rdname isActive
 #' @export

http://git-wip-us.apache.org/repos/asf/spark/blob/c890e938/R/pkg/R/streaming.R
----------------------------------------------------------------------
diff --git a/R/pkg/R/streaming.R b/R/pkg/R/streaming.R
index e353d2d..8390bd5 100644
--- a/R/pkg/R/streaming.R
+++ b/R/pkg/R/streaming.R
@@ -169,8 +169,10 @@ setMethod("isActive",
 #' immediately.
 #'
 #' @param x a StreamingQuery.
-#' @param timeout time to wait in milliseconds
-#' @return TRUE if query has terminated within the timeout period.
+#' @param timeout time to wait in milliseconds, if omitted, wait indefinitely 
until \code{stopQuery}
+#'                is called or an error has occured.
+#' @return TRUE if query has terminated within the timeout period; nothing if 
timeout is not
+#'         specified.
 #' @rdname awaitTermination
 #' @name awaitTermination
 #' @aliases awaitTermination,StreamingQuery-method
@@ -182,8 +184,12 @@ setMethod("isActive",
 #' @note experimental
 setMethod("awaitTermination",
           signature(x = "StreamingQuery"),
-          function(x, timeout) {
-            handledCallJMethod(x@ssq, "awaitTermination", as.integer(timeout))
+          function(x, timeout = NULL) {
+            if (is.null(timeout)) {
+              invisible(handledCallJMethod(x@ssq, "awaitTermination"))
+            } else {
+              handledCallJMethod(x@ssq, "awaitTermination", 
as.integer(timeout))
+            }
           })
 
 #' stopQuery

http://git-wip-us.apache.org/repos/asf/spark/blob/c890e938/R/pkg/inst/tests/testthat/test_streaming.R
----------------------------------------------------------------------
diff --git a/R/pkg/inst/tests/testthat/test_streaming.R 
b/R/pkg/inst/tests/testthat/test_streaming.R
index 1f4054a..b125cb0 100644
--- a/R/pkg/inst/tests/testthat/test_streaming.R
+++ b/R/pkg/inst/tests/testthat/test_streaming.R
@@ -61,6 +61,7 @@ test_that("read.stream, write.stream, awaitTermination, 
stopQuery", {
 
   stopQuery(q)
   expect_true(awaitTermination(q, 1))
+  expect_error(awaitTermination(q), NA)
 })
 
 test_that("print from explain, lastProgress, status, isActive", {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to