This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
     new 4d1d0a4  [SPARK-27111][SS] Fix a race that a continuous query may fail 
with InterruptedException
4d1d0a4 is described below

commit 4d1d0a41a862c234acb9b8b68e96da7bf079eb8d
Author: Shixiong Zhu <zsxw...@gmail.com>
AuthorDate: Sat Mar 9 14:26:58 2019 -0800

    [SPARK-27111][SS] Fix a race that a continuous query may fail with 
InterruptedException
    
    Before a Kafka consumer gets assigned with partitions, its offset will 
contain 0 partitions. However, runContinuous will still run and launch a Spark 
job having 0 partitions. In this case, there is a race that epoch may interrupt 
the query execution thread after `lastExecution.toRdd`, and either 
`epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)` or the next 
`runContinuous` will get interrupted unintentionally.
    
    To handle this case, this PR has the following changes:
    
    - Clean up the resources in `queryExecutionThread.runUninterruptibly`. This 
may increase the waiting time of `stop` but should be minor because the 
operations here are very fast (just sending an RPC message in the same process 
and stopping a very simple thread).
    - Clear the interrupted status at the end so that it won't impact the 
`runContinuous` call. We may clear the interrupted status set by `stop`, but it 
doesn't affect the query termination because `runActivatedStream` will check 
`state` and exit accordingly.
    
    I also updated the clean up codes to make sure exceptions thrown from 
`epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)` won't stop the 
clean up.
    
    Jenkins
    
    Closes #24034 from zsxwing/SPARK-27111.
    
    Authored-by: Shixiong Zhu <zsxw...@gmail.com>
    Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>
    (cherry picked from commit 6e1c0827ece1cdc615196e60cb11c76b917b8eeb)
    Signed-off-by: Shixiong Zhu <zsxw...@gmail.com>
---
 .../streaming/continuous/ContinuousExecution.scala | 32 ++++++++++++++++------
 1 file changed, 24 insertions(+), 8 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 62adedb..dad7f9b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -270,14 +270,30 @@ class ContinuousExecution(
         logInfo(s"Query $id ignoring exception from reconfiguring: $t")
         // interrupted by reconfiguration - swallow exception so we can 
restart the query
     } finally {
-      epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)
-      SparkEnv.get.rpcEnv.stop(epochEndpoint)
-
-      epochUpdateThread.interrupt()
-      epochUpdateThread.join()
-
-      stopSources()
-      sparkSession.sparkContext.cancelJobGroup(runId.toString)
+      // The above execution may finish before getting interrupted, for 
example, a Spark job having
+      // 0 partitions will complete immediately. Then the interrupted status 
will sneak here.
+      //
+      // To handle this case, we do the two things here:
+      //
+      // 1. Clean up the resources in 
`queryExecutionThread.runUninterruptibly`. This may increase
+      //    the waiting time of `stop` but should be minor because the 
operations here are very fast
+      //    (just sending an RPC message in the same process and stopping a 
very simple thread).
+      // 2. Clear the interrupted status at the end so that it won't impact 
the `runContinuous`
+      //    call. We may clear the interrupted status set by `stop`, but it 
doesn't affect the query
+      //    termination because `runActivatedStream` will check `state` and 
exit accordingly.
+      queryExecutionThread.runUninterruptibly {
+        try {
+          epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)
+        } finally {
+          SparkEnv.get.rpcEnv.stop(epochEndpoint)
+          epochUpdateThread.interrupt()
+          epochUpdateThread.join()
+          stopSources()
+          // The following line must be the last line because it may fail if 
SparkContext is stopped
+          sparkSession.sparkContext.cancelJobGroup(runId.toString)
+        }
+      }
+      Thread.interrupted()
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to