Repository: spark
Updated Branches:
  refs/heads/branch-1.1 4895f6544 -> 78cd3ab88


[SPARK-4301] StreamingContext should not allow start() to be called after 
calling stop()

In Spark 1.0.0+, calling `stop()` on a StreamingContext that has not been 
started is a no-op which has no side-effects. This allows users to call 
`stop()` on a fresh StreamingContext followed by `start()`. I believe that this 
almost always indicates an error and is not behavior that we should support. 
Since we don't allow `start() stop() start()` then I don't think it makes sense 
to allow `stop() start()`.

The current behavior can lead to resource leaks when StreamingContext 
constructs its own SparkContext: if I call `stop(stopSparkContext=True)`, then 
I expect StreamingContext's underlying SparkContext to be stopped irrespective 
of whether the StreamingContext has been started. This is useful when writing 
unit test fixtures.

Prior discussions:
- https://github.com/apache/spark/pull/3053#discussion-diff-19710333R490
- https://github.com/apache/spark/pull/3121#issuecomment-61927353

Author: Josh Rosen <joshro...@databricks.com>

Closes #3160 from JoshRosen/SPARK-4301 and squashes the following commits:

dbcc929 [Josh Rosen] Address more review comments
bdbe5da [Josh Rosen] Stop SparkContext after stopping scheduler, not before.
03e9c40 [Josh Rosen] Always stop SparkContext, even if stop(false) has already 
been called.
832a7f4 [Josh Rosen] Address review comment
5142517 [Josh Rosen] Add tests; improve Scaladoc.
813e471 [Josh Rosen] Revert workaround added in 
https://github.com/apache/spark/pull/3053/files#diff-e144dbee130ed84f9465853ddce65f8eR49
5558e70 [Josh Rosen] StreamingContext.stop() should stop SparkContext even if 
StreamingContext has not been started yet.

(cherry picked from commit 7b41b17f3296eea3282efbdceb6b28baf128287d)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/78cd3ab8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/78cd3ab8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/78cd3ab8

Branch: refs/heads/branch-1.1
Commit: 78cd3ab880cb5fe61a155fec50e7c2cc60872a52
Parents: 4895f65
Author: Josh Rosen <joshro...@databricks.com>
Authored: Sat Nov 8 18:10:23 2014 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Sat Nov 8 18:12:02 2014 -0800

----------------------------------------------------------------------
 .../spark/streaming/StreamingContext.scala      | 38 +++++++++++---------
 .../spark/streaming/StreamingContextSuite.scala | 21 +++++++++--
 2 files changed, 40 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/78cd3ab8/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index e8a863d..f4bfc08 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -436,10 +436,10 @@ class StreamingContext private[streaming] (
 
   /**
    * Start the execution of the streams.
+   *
+   * @throws SparkException if the context has already been started or stopped.
    */
   def start(): Unit = synchronized {
-    // Throw exception if the context has already been started once
-    // or if a stopped context is being started again
     if (state == Started) {
       throw new SparkException("StreamingContext has already been started")
     }
@@ -472,8 +472,10 @@ class StreamingContext private[streaming] (
   /**
    * Stop the execution of the streams immediately (does not wait for all 
received data
    * to be processed).
-   * @param stopSparkContext Stop the associated SparkContext or not
    *
+   * @param stopSparkContext if true, stops the associated SparkContext. The 
underlying SparkContext
+   *                         will be stopped regardless of whether this 
StreamingContext has been
+   *                         started.
    */
   def stop(stopSparkContext: Boolean = true): Unit = synchronized {
     stop(stopSparkContext, false)
@@ -482,25 +484,27 @@ class StreamingContext private[streaming] (
   /**
    * Stop the execution of the streams, with option of ensuring all received 
data
    * has been processed.
-   * @param stopSparkContext Stop the associated SparkContext or not
-   * @param stopGracefully Stop gracefully by waiting for the processing of all
+   *
+   * @param stopSparkContext if true, stops the associated SparkContext. The 
underlying SparkContext
+   *                         will be stopped regardless of whether this 
StreamingContext has been
+   *                         started.
+   * @param stopGracefully if true, stops gracefully by waiting for the 
processing of all
    *                       received data to be completed
    */
   def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = 
synchronized {
-    // Warn (but not fail) if context is stopped twice,
-    // or context is stopped before starting
-    if (state == Initialized) {
-      logWarning("StreamingContext has not been started yet")
-      return
+    state match {
+      case Initialized => logWarning("StreamingContext has not been started 
yet")
+      case Stopped => logWarning("StreamingContext has already been stopped")
+      case Started =>
+        scheduler.stop(stopGracefully)
+        logInfo("StreamingContext stopped successfully")
+        waiter.notifyStop()
     }
-    if (state == Stopped) {
-      logWarning("StreamingContext has already been stopped")
-      return
-    } // no need to throw an exception as its okay to stop twice
-    scheduler.stop(stopGracefully)
-    logInfo("StreamingContext stopped successfully")
-    waiter.notifyStop()
+    // Even if the streaming context has not been started, we still need to 
stop the SparkContext.
+    // Even if we have already stopped, we still need to attempt to stop the 
SparkContext because
+    // a user might stop(stopSparkContext = false) and then call 
stop(stopSparkContext = true).
     if (stopSparkContext) sc.stop()
+    // The state should always be Stopped after calling `stop()`, even if we 
haven't started yet:
     state = Stopped
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/78cd3ab8/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index f48c89c..29c1159 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -135,11 +135,16 @@ class StreamingContextSuite extends FunSuite with 
BeforeAndAfter with Timeouts w
     ssc.stop()
   }
 
-  test("stop before start and start after stop") {
+  test("stop before start") {
     ssc = new StreamingContext(master, appName, batchDuration)
     addInputStream(ssc).register
     ssc.stop()  // stop before start should not throw exception
-    ssc.start()
+  }
+
+  test("start after stop") {
+    // Regression test for SPARK-4301
+    ssc = new StreamingContext(master, appName, batchDuration)
+    addInputStream(ssc).register()
     ssc.stop()
     intercept[SparkException] {
       ssc.start() // start after stop should throw exception
@@ -159,6 +164,18 @@ class StreamingContextSuite extends FunSuite with 
BeforeAndAfter with Timeouts w
     ssc.stop()
   }
 
+  test("stop(stopSparkContext=true) after stop(stopSparkContext=false)") {
+    ssc = new StreamingContext(master, appName, batchDuration)
+    addInputStream(ssc).register()
+    ssc.stop(stopSparkContext = false)
+    assert(ssc.sc.makeRDD(1 to 100).collect().size === 100)
+    ssc.stop(stopSparkContext = true)
+    // Check that the SparkContext is actually stopped:
+    intercept[Exception] {
+      ssc.sc.makeRDD(1 to 100).collect()
+    }
+  }
+
   test("stop gracefully") {
     val conf = new SparkConf().setMaster(master).setAppName(appName)
     conf.set("spark.cleaner.ttl", "3600")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to