Repository: spark
Updated Branches:
  refs/heads/master b7a036b75 -> f45793329


[SPARK-23416][SS] Add a specific stop method for ContinuousExecution.

## What changes were proposed in this pull request?

Add a specific stop method for ContinuousExecution. The previous 
StreamExecution.stop() method had a race condition as applied to continuous 
processing: if the cancellation was round-tripped to the driver too quickly, 
the generic SparkException it caused would be reported as the query death 
cause. We earlier decided that SparkException should not be added to the 
StreamExecution.isInterruptionException() whitelist, so we need to ensure this 
never happens instead.

## How was this patch tested?

Existing tests. I could consistently reproduce the previous flakiness by 
putting Thread.sleep(1000) between the first job cancellation and thread 
interruption in StreamExecution.stop().

Author: Jose Torres <torres.joseph.f+git...@gmail.com>

Closes #21384 from jose-torres/fixKafka.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f4579332
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f4579332
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f4579332

Branch: refs/heads/master
Commit: f4579332931c9bf424d0b6147fad89bd63da26f6
Parents: b7a036b
Author: Jose Torres <torres.joseph.f+git...@gmail.com>
Authored: Wed May 23 17:21:29 2018 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Wed May 23 17:21:29 2018 -0700

----------------------------------------------------------------------
 .../execution/streaming/MicroBatchExecution.scala | 18 ++++++++++++++++++
 .../sql/execution/streaming/StreamExecution.scala | 18 ------------------
 .../continuous/ContinuousExecution.scala          | 16 ++++++++++++++++
 3 files changed, 34 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f4579332/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 6709e70..7817360 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -127,6 +127,24 @@ class MicroBatchExecution(
   }
 
   /**
+   * Signals to the thread executing micro-batches that it should stop running 
after the next
+   * batch. This method blocks until the thread stops running.
+   */
+  override def stop(): Unit = {
+    // Set the state to TERMINATED so that the batching thread knows that it 
was interrupted
+    // intentionally
+    state.set(TERMINATED)
+    if (queryExecutionThread.isAlive) {
+      sparkSession.sparkContext.cancelJobGroup(runId.toString)
+      queryExecutionThread.interrupt()
+      queryExecutionThread.join()
+      // microBatchThread may spawn new jobs, so we need to cancel again to 
prevent a leak
+      sparkSession.sparkContext.cancelJobGroup(runId.toString)
+    }
+    logInfo(s"Query $prettyIdString was stopped")
+  }
+
+  /**
    * Repeatedly attempts to run batches as data arrives.
    */
   protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit 
= {

http://git-wip-us.apache.org/repos/asf/spark/blob/f4579332/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 3fc8c78..290de87 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -379,24 +379,6 @@ abstract class StreamExecution(
   }
 
   /**
-   * Signals to the thread executing micro-batches that it should stop running 
after the next
-   * batch. This method blocks until the thread stops running.
-   */
-  override def stop(): Unit = {
-    // Set the state to TERMINATED so that the batching thread knows that it 
was interrupted
-    // intentionally
-    state.set(TERMINATED)
-    if (queryExecutionThread.isAlive) {
-      sparkSession.sparkContext.cancelJobGroup(runId.toString)
-      queryExecutionThread.interrupt()
-      queryExecutionThread.join()
-      // microBatchThread may spawn new jobs, so we need to cancel again to 
prevent a leak
-      sparkSession.sparkContext.cancelJobGroup(runId.toString)
-    }
-    logInfo(s"Query $prettyIdString was stopped")
-  }
-
-  /**
    * Blocks the current thread until processing for data from the given 
`source` has reached at
    * least the given `Offset`. This method is intended for use primarily when 
writing tests.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/f4579332/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 0e7d101..d16b24c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -356,6 +356,22 @@ class ContinuousExecution(
       }
     }
   }
+
+  /**
+   * Stops the query execution thread to terminate the query.
+   */
+  override def stop(): Unit = {
+    // Set the state to TERMINATED so that the batching thread knows that it 
was interrupted
+    // intentionally
+    state.set(TERMINATED)
+    if (queryExecutionThread.isAlive) {
+      // The query execution thread will clean itself up in the finally clause 
of runContinuous.
+      // We just need to interrupt the long running job.
+      queryExecutionThread.interrupt()
+      queryExecutionThread.join()
+    }
+    logInfo(s"Query $prettyIdString was stopped")
+  }
 }
 
 object ContinuousExecution {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to