Repository: spark Updated Branches: refs/heads/master 185f5bc7d -> 7ec83658f
[SPARK-23491][SS] Remove explicit job cancellation from ContinuousExecution reconfiguring ## What changes were proposed in this pull request? Remove queryExecutionThread.interrupt() from ContinuousExecution. As detailed in the JIRA, interrupting the thread is only relevant in the microbatch case; for continuous processing the query execution can quickly clean itself up without. ## How was this patch tested? existing tests Author: Jose Torres <j...@databricks.com> Closes #20622 from jose-torres/SPARK-23441. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7ec83658 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7ec83658 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7ec83658 Branch: refs/heads/master Commit: 7ec83658fbc88505dfc2d8a6f76e90db747f1292 Parents: 185f5bc Author: Jose Torres <j...@databricks.com> Authored: Mon Feb 26 11:28:44 2018 -0800 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Mon Feb 26 11:28:44 2018 -0800 ---------------------------------------------------------------------- .../streaming/continuous/ContinuousExecution.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7ec83658/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 2c1d6c5..daebd1d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -236,9 +236,7 @@ class ContinuousExecution( startTrigger() if (reader.needsReconfiguration() && state.compareAndSet(ACTIVE, RECONFIGURING)) { - stopSources() if (queryExecutionThread.isAlive) { - sparkSession.sparkContext.cancelJobGroup(runId.toString) queryExecutionThread.interrupt() } false @@ -266,12 +264,20 @@ class ContinuousExecution( SQLExecution.withNewExecutionId( sparkSessionForQuery, lastExecution)(lastExecution.toRdd) } + } catch { + case t: Throwable + if StreamExecution.isInterruptionException(t) && state.get() == RECONFIGURING => + logInfo(s"Query $id ignoring exception from reconfiguring: $t") + // interrupted by reconfiguration - swallow exception so we can restart the query } finally { epochEndpoint.askSync[Unit](StopContinuousExecutionWrites) SparkEnv.get.rpcEnv.stop(epochEndpoint) epochUpdateThread.interrupt() epochUpdateThread.join() + + stopSources() + sparkSession.sparkContext.cancelJobGroup(runId.toString) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org