Repository: spark Updated Branches: refs/heads/master 6aed719e5 -> 4cf4cba08
[SPARK-5379][Streaming] Add awaitTerminationOrTimeout Added `awaitTerminationOrTimeout` to return if the waiting time elapsed: * `true` if it's stopped. * `false` if the waiting time elapsed before returning from the method. * throw the reported error if it's thrown during the execution. Also deprecated `awaitTermination(timeout: Long)`. Author: zsxwing <zsxw...@gmail.com> Closes #4171 from zsxwing/SPARK-5379 and squashes the following commits: c9e660b [zsxwing] Add a unit test for awaitTerminationOrTimeout 8a89f92 [zsxwing] Add awaitTerminationOrTimeout to python cdc820b [zsxwing] Add awaitTerminationOrTimeout Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4cf4cba0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4cf4cba0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4cf4cba0 Branch: refs/heads/master Commit: 4cf4cba08f1757ec0d9bffdfae6db719a4fb5a3f Parents: 6aed719 Author: zsxwing <zsxw...@gmail.com> Authored: Wed Feb 4 00:40:28 2015 -0800 Committer: Tathagata Das <t...@databricks.com> Committed: Wed Feb 4 00:40:28 2015 -0800 ---------------------------------------------------------------------- python/pyspark/streaming/context.py | 9 ++++++++ .../spark/streaming/StreamingContext.scala | 13 +++++++++++ .../api/java/JavaStreamingContext.scala | 13 +++++++++++ .../spark/streaming/StreamingContextSuite.scala | 24 ++++++++++++++++++++ 4 files changed, 59 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4cf4cba0/python/pyspark/streaming/context.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 18aaae9..b06ab65 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -191,6 +191,15 @@ class StreamingContext(object): else: self._jssc.awaitTermination(int(timeout * 1000)) + def awaitTerminationOrTimeout(self, timeout): + """ + Wait for the execution to stop. Return `true` if it's stopped; or + throw the reported error during the execution; or `false` if the + waiting time elapsed before returning from the method. + @param timeout: time to wait in seconds + """ + self._jssc.awaitTerminationOrTimeout(int(timeout * 1000)) + def stop(self, stopSparkContext=True, stopGraceFully=False): """ Stop the execution of the streams, with option of ensuring all http://git-wip-us.apache.org/repos/asf/spark/blob/4cf4cba0/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index ddc435c..ba3f234 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -526,11 +526,24 @@ class StreamingContext private[streaming] ( * will be thrown in this thread. * @param timeout time to wait in milliseconds */ + @deprecated("Use awaitTerminationOrTimeout(Long) instead", "1.3.0") def awaitTermination(timeout: Long) { waiter.waitForStopOrError(timeout) } /** + * Wait for the execution to stop. Any exceptions that occurs during the execution + * will be thrown in this thread. + * + * @param timeout time to wait in milliseconds + * @return `true` if it's stopped; or throw the reported error during the execution; or `false` + * if the waiting time elapsed before returning from the method. + */ + def awaitTerminationOrTimeout(timeout: Long): Boolean = { + waiter.waitForStopOrError(timeout) + } + + /** * Stop the execution of the streams immediately (does not wait for all received data * to be processed). * http://git-wip-us.apache.org/repos/asf/spark/blob/4cf4cba0/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala index 0f7ae7a..e3db01c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala @@ -597,11 +597,24 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable { * will be thrown in this thread. * @param timeout time to wait in milliseconds */ + @deprecated("Use awaitTerminationOrTimeout(Long) instead", "1.3.0") def awaitTermination(timeout: Long): Unit = { ssc.awaitTermination(timeout) } /** + * Wait for the execution to stop. Any exceptions that occurs during the execution + * will be thrown in this thread. + * + * @param timeout time to wait in milliseconds + * @return `true` if it's stopped; or throw the reported error during the execution; or `false` + * if the waiting time elapsed before returning from the method. + */ + def awaitTerminationOrTimeout(timeout: Long): Boolean = { + ssc.awaitTerminationOrTimeout(timeout) + } + + /** * Stop the execution of the streams. Will stop the associated JavaSparkContext as well. */ def stop(): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/4cf4cba0/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 0b5af25..2aa5e08 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -304,6 +304,30 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w assert(exception.getMessage.contains("transform"), "Expected exception not thrown") } + test("awaitTerminationOrTimeout") { + ssc = new StreamingContext(master, appName, batchDuration) + val inputStream = addInputStream(ssc) + inputStream.map(x => x).register() + + ssc.start() + + // test whether awaitTerminationOrTimeout() return false after give amount of time + failAfter(1000 millis) { + assert(ssc.awaitTerminationOrTimeout(500) === false) + } + + // test whether awaitTerminationOrTimeout() return true if context is stopped + failAfter(10000 millis) { // 10 seconds because spark takes a long time to shutdown + new Thread() { + override def run() { + Thread.sleep(500) + ssc.stop() + } + }.start() + assert(ssc.awaitTerminationOrTimeout(10000) === true) + } + } + test("DStream and generated RDD creation sites") { testPackage.test() } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org