Repository: spark
Updated Branches:
  refs/heads/master 6aed719e5 -> 4cf4cba08


[SPARK-5379][Streaming] Add awaitTerminationOrTimeout

Added `awaitTerminationOrTimeout` to return if the waiting time elapsed:
* `true` if it's stopped.
* `false` if the waiting time elapsed before returning from the method.
* throw the reported error if it's thrown during the execution.

Also deprecated `awaitTermination(timeout: Long)`.

Author: zsxwing <zsxw...@gmail.com>

Closes #4171 from zsxwing/SPARK-5379 and squashes the following commits:

c9e660b [zsxwing] Add a unit test for awaitTerminationOrTimeout
8a89f92 [zsxwing] Add awaitTerminationOrTimeout to python
cdc820b [zsxwing] Add awaitTerminationOrTimeout


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4cf4cba0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4cf4cba0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4cf4cba0

Branch: refs/heads/master
Commit: 4cf4cba08f1757ec0d9bffdfae6db719a4fb5a3f
Parents: 6aed719
Author: zsxwing <zsxw...@gmail.com>
Authored: Wed Feb 4 00:40:28 2015 -0800
Committer: Tathagata Das <t...@databricks.com>
Committed: Wed Feb 4 00:40:28 2015 -0800

----------------------------------------------------------------------
 python/pyspark/streaming/context.py             |  9 ++++++++
 .../spark/streaming/StreamingContext.scala      | 13 +++++++++++
 .../api/java/JavaStreamingContext.scala         | 13 +++++++++++
 .../spark/streaming/StreamingContextSuite.scala | 24 ++++++++++++++++++++
 4 files changed, 59 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4cf4cba0/python/pyspark/streaming/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/streaming/context.py 
b/python/pyspark/streaming/context.py
index 18aaae9..b06ab65 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -191,6 +191,15 @@ class StreamingContext(object):
         else:
             self._jssc.awaitTermination(int(timeout * 1000))
 
+    def awaitTerminationOrTimeout(self, timeout):
+        """
+        Wait for the execution to stop. Return `true` if it's stopped; or
+        throw the reported error during the execution; or `false` if the
+        waiting time elapsed before returning from the method.
+        @param timeout: time to wait in seconds
+        """
+        self._jssc.awaitTerminationOrTimeout(int(timeout * 1000))
+
     def stop(self, stopSparkContext=True, stopGraceFully=False):
         """
         Stop the execution of the streams, with option of ensuring all

http://git-wip-us.apache.org/repos/asf/spark/blob/4cf4cba0/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index ddc435c..ba3f234 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -526,11 +526,24 @@ class StreamingContext private[streaming] (
    * will be thrown in this thread.
    * @param timeout time to wait in milliseconds
    */
+  @deprecated("Use awaitTerminationOrTimeout(Long) instead", "1.3.0")
   def awaitTermination(timeout: Long) {
     waiter.waitForStopOrError(timeout)
   }
 
   /**
+   * Wait for the execution to stop. Any exceptions that occurs during the 
execution
+   * will be thrown in this thread.
+   *
+   * @param timeout time to wait in milliseconds
+   * @return `true` if it's stopped; or throw the reported error during the 
execution; or `false`
+   *         if the waiting time elapsed before returning from the method.
+   */
+  def awaitTerminationOrTimeout(timeout: Long): Boolean = {
+    waiter.waitForStopOrError(timeout)
+  }
+
+  /**
    * Stop the execution of the streams immediately (does not wait for all 
received data
    * to be processed).
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/4cf4cba0/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 0f7ae7a..e3db01c 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -597,11 +597,24 @@ class JavaStreamingContext(val ssc: StreamingContext) 
extends Closeable {
    * will be thrown in this thread.
    * @param timeout time to wait in milliseconds
    */
+  @deprecated("Use awaitTerminationOrTimeout(Long) instead", "1.3.0")
   def awaitTermination(timeout: Long): Unit = {
     ssc.awaitTermination(timeout)
   }
 
   /**
+   * Wait for the execution to stop. Any exceptions that occurs during the 
execution
+   * will be thrown in this thread.
+   *
+   * @param timeout time to wait in milliseconds
+   * @return `true` if it's stopped; or throw the reported error during the 
execution; or `false`
+   *         if the waiting time elapsed before returning from the method.
+   */
+  def awaitTerminationOrTimeout(timeout: Long): Boolean = {
+    ssc.awaitTerminationOrTimeout(timeout)
+  }
+
+  /**
    * Stop the execution of the streams. Will stop the associated 
JavaSparkContext as well.
    */
   def stop(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/4cf4cba0/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 0b5af25..2aa5e08 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -304,6 +304,30 @@ class StreamingContextSuite extends FunSuite with 
BeforeAndAfter with Timeouts w
     assert(exception.getMessage.contains("transform"), "Expected exception not 
thrown")
   }
 
+  test("awaitTerminationOrTimeout") {
+    ssc = new StreamingContext(master, appName, batchDuration)
+    val inputStream = addInputStream(ssc)
+    inputStream.map(x => x).register()
+
+    ssc.start()
+
+    // test whether awaitTerminationOrTimeout() return false after give amount 
of time
+    failAfter(1000 millis) {
+      assert(ssc.awaitTerminationOrTimeout(500) === false)
+    }
+
+    // test whether awaitTerminationOrTimeout() return true if context is 
stopped
+    failAfter(10000 millis) { // 10 seconds because spark takes a long time to 
shutdown
+      new Thread() {
+        override def run() {
+          Thread.sleep(500)
+          ssc.stop()
+        }
+      }.start()
+      assert(ssc.awaitTerminationOrTimeout(10000) === true)
+    }
+  }
+
   test("DStream and generated RDD creation sites") {
     testPackage.test()
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to