This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6e1c082 [SPARK-27111][SS] Fix a race that a continuous query may fail with InterruptedException 6e1c082 is described below commit 6e1c0827ece1cdc615196e60cb11c76b917b8eeb Author: Shixiong Zhu <zsxw...@gmail.com> AuthorDate: Sat Mar 9 14:26:58 2019 -0800 [SPARK-27111][SS] Fix a race that a continuous query may fail with InterruptedException ## What changes were proposed in this pull request? Before a Kafka consumer gets assigned with partitions, its offset will contain 0 partitions. However, runContinuous will still run and launch a Spark job having 0 partitions. In this case, there is a race that epoch may interrupt the query execution thread after `lastExecution.toRdd`, and either `epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)` or the next `runContinuous` will get interrupted unintentionally. To handle this case, this PR has the following changes: - Clean up the resources in `queryExecutionThread.runUninterruptibly`. This may increase the waiting time of `stop` but should be minor because the operations here are very fast (just sending an RPC message in the same process and stopping a very simple thread). - Clear the interrupted status at the end so that it won't impact the `runContinuous` call. We may clear the interrupted status set by `stop`, but it doesn't affect the query termination because `runActivatedStream` will check `state` and exit accordingly. I also updated the clean up codes to make sure exceptions thrown from `epochEndpoint.askSync[Unit](StopContinuousExecutionWrites)` won't stop the clean up. ## How was this patch tested? Jenkins Closes #24034 from zsxwing/SPARK-27111. Authored-by: Shixiong Zhu <zsxw...@gmail.com> Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> --- .../streaming/continuous/ContinuousExecution.scala | 30 +++++++++++++++++----- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 26b5642..aef556d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -268,13 +268,29 @@ class ContinuousExecution( logInfo(s"Query $id ignoring exception from reconfiguring: $t") // interrupted by reconfiguration - swallow exception so we can restart the query } finally { - epochEndpoint.askSync[Unit](StopContinuousExecutionWrites) - SparkEnv.get.rpcEnv.stop(epochEndpoint) - - epochUpdateThread.interrupt() - epochUpdateThread.join() - - sparkSession.sparkContext.cancelJobGroup(runId.toString) + // The above execution may finish before getting interrupted, for example, a Spark job having + // 0 partitions will complete immediately. Then the interrupted status will sneak here. + // + // To handle this case, we do the two things here: + // + // 1. Clean up the resources in `queryExecutionThread.runUninterruptibly`. This may increase + // the waiting time of `stop` but should be minor because the operations here are very fast + // (just sending an RPC message in the same process and stopping a very simple thread). + // 2. Clear the interrupted status at the end so that it won't impact the `runContinuous` + // call. We may clear the interrupted status set by `stop`, but it doesn't affect the query + // termination because `runActivatedStream` will check `state` and exit accordingly. + queryExecutionThread.runUninterruptibly { + try { + epochEndpoint.askSync[Unit](StopContinuousExecutionWrites) + } finally { + SparkEnv.get.rpcEnv.stop(epochEndpoint) + epochUpdateThread.interrupt() + epochUpdateThread.join() + // The following line must be the last line because it may fail if SparkContext is stopped + sparkSession.sparkContext.cancelJobGroup(runId.toString) + } + } + Thread.interrupted() } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org