Repository: spark Updated Branches: refs/heads/master c8d0aba19 -> ab866f117
[SPARK-21248][SS] The clean up codes in StreamExecution should not be interrupted ## What changes were proposed in this pull request? This PR uses `runUninterruptibly` to avoid that the clean up codes in StreamExecution is interrupted. It also removes an optimization in `runUninterruptibly` to make sure this method never throw `InterruptedException`. ## How was this patch tested? Jenkins Author: Shixiong Zhu <shixi...@databricks.com> Closes #18461 from zsxwing/SPARK-21248. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab866f11 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab866f11 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab866f11 Branch: refs/heads/master Commit: ab866f117378e64dba483ead51b769ae7be31d4d Parents: c8d0aba Author: Shixiong Zhu <shixi...@databricks.com> Authored: Wed Jul 5 18:26:28 2017 -0700 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Wed Jul 5 18:26:28 2017 -0700 ---------------------------------------------------------------------- .../org/apache/spark/util/UninterruptibleThread.scala | 10 +--------- .../apache/spark/util/UninterruptibleThreadSuite.scala | 5 ++--- .../spark/sql/execution/streaming/StreamExecution.scala | 6 +++++- 3 files changed, 8 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ab866f11/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala b/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala index 27922b3..6a58ec1 100644 --- a/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala +++ b/core/src/main/scala/org/apache/spark/util/UninterruptibleThread.scala @@ -55,9 +55,6 @@ private[spark] class UninterruptibleThread( * Run `f` uninterruptibly in `this` thread. The thread won't be interrupted before returning * from `f`. * - * If this method finds that `interrupt` is called before calling `f` and it's not inside another - * `runUninterruptibly`, it will throw `InterruptedException`. - * * Note: this method should be called only in `this` thread. */ def runUninterruptibly[T](f: => T): T = { @@ -73,12 +70,7 @@ private[spark] class UninterruptibleThread( uninterruptibleLock.synchronized { // Clear the interrupted status if it's set. - if (Thread.interrupted() || shouldInterruptThread) { - shouldInterruptThread = false - // Since it's interrupted, we don't need to run `f` which may be a long computation. - // Throw InterruptedException as we don't have a T to return. - throw new InterruptedException() - } + shouldInterruptThread = Thread.interrupted() || shouldInterruptThread uninterruptible = true } try { http://git-wip-us.apache.org/repos/asf/spark/blob/ab866f11/core/src/test/scala/org/apache/spark/util/UninterruptibleThreadSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/UninterruptibleThreadSuite.scala b/core/src/test/scala/org/apache/spark/util/UninterruptibleThreadSuite.scala index 39b31f8..6a190f6 100644 --- a/core/src/test/scala/org/apache/spark/util/UninterruptibleThreadSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UninterruptibleThreadSuite.scala @@ -68,7 +68,6 @@ class UninterruptibleThreadSuite extends SparkFunSuite { Uninterruptibles.awaitUninterruptibly(interruptLatch, 10, TimeUnit.SECONDS) try { runUninterruptibly { - assert(false, "Should not reach here") } } catch { case _: InterruptedException => hasInterruptedException = true @@ -80,8 +79,8 @@ class UninterruptibleThreadSuite extends SparkFunSuite { t.interrupt() interruptLatch.countDown() t.join() - assert(hasInterruptedException === true) - assert(interruptStatusBeforeExit === false) + assert(hasInterruptedException === false) + assert(interruptStatusBeforeExit === true) } test("nested runUninterruptibly") { http://git-wip-us.apache.org/repos/asf/spark/blob/ab866f11/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index d5f8d2a..10c42a7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -357,7 +357,11 @@ class StreamExecution( if (!NonFatal(e)) { throw e } - } finally { + } finally microBatchThread.runUninterruptibly { + // The whole `finally` block must run inside `runUninterruptibly` to avoid being interrupted + // when a query is stopped by the user. We need to make sure the following codes finish + // otherwise it may throw `InterruptedException` to `UncaughtExceptionHandler` (SPARK-21248). + // Release latches to unblock the user codes since exception can happen in any place and we // may not get a chance to release them startLatch.countDown() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org