Repository: spark Updated Branches: refs/heads/branch-1.6 862d788fc -> b18112666
[STREAMING][FLAKY-TEST] Catch execution context race condition in `FileBasedWriteAheadLog.close()` There is a race condition in `FileBasedWriteAheadLog.close()`, where if delete's of old log files are in progress, the write ahead log may close, and result in a `RejectedExecutionException`. This is okay, and should be handled gracefully. Example test failures: https://amplab.cs.berkeley.edu/jenkins/job/Spark-1.6-SBT/AMPLAB_JENKINS_BUILD_PROFILE=hadoop1.0,label=spark-test/95/testReport/junit/org.apache.spark.streaming.util/BatchedWriteAheadLogWithCloseFileAfterWriteSuite/BatchedWriteAheadLog___clean_old_logs/ The reason the test fails is in `afterEach`, `writeAheadLog.close` is called, and there may still be async deletes in flight. tdas zsxwing Author: Burak Yavuz <brk...@gmail.com> Closes #9953 from brkyvz/flaky-ss. (cherry picked from commit a5d988763319f63a8e2b58673dd4f9098f17c835) Signed-off-by: Shixiong Zhu <shixi...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b1811266 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b1811266 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b1811266 Branch: refs/heads/branch-1.6 Commit: b18112666adec0a942d1cfe8d6b9f1e7c7201fcd Parents: 862d788 Author: Burak Yavuz <brk...@gmail.com> Authored: Tue Nov 24 20:58:47 2015 -0800 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Tue Nov 24 20:59:01 2015 -0800 ---------------------------------------------------------------------- .../streaming/util/FileBasedWriteAheadLog.scala | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b1811266/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala index 72705f1..f5165f7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.util import java.nio.ByteBuffer -import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.{RejectedExecutionException, ThreadPoolExecutor} import java.util.{Iterator => JIterator} import scala.collection.JavaConverters._ @@ -176,10 +176,16 @@ private[streaming] class FileBasedWriteAheadLog( } oldLogFiles.foreach { logInfo => if (!executionContext.isShutdown) { - val f = Future { deleteFile(logInfo) }(executionContext) - if (waitForCompletion) { - import scala.concurrent.duration._ - Await.ready(f, 1 second) + try { + val f = Future { deleteFile(logInfo) }(executionContext) + if (waitForCompletion) { + import scala.concurrent.duration._ + Await.ready(f, 1 second) + } + } catch { + case e: RejectedExecutionException => + logWarning("Execution context shutdown before deleting old WriteAheadLogs. " + + "This would not affect recovery correctness.", e) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org