spark git commit: [STREAMING][FLAKY-TEST] Catch execution context race condition in `FileBasedWriteAheadLog.close()`

2015-11-24 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 4d6bbbc03 -> a5d988763


[STREAMING][FLAKY-TEST] Catch execution context race condition in 
`FileBasedWriteAheadLog.close()`

There is a race condition in `FileBasedWriteAheadLog.close()`, where if 
delete's of old log files are in progress, the write ahead log may close, and 
result in a `RejectedExecutionException`. This is okay, and should be handled 
gracefully.

Example test failures:
https://amplab.cs.berkeley.edu/jenkins/job/Spark-1.6-SBT/AMPLAB_JENKINS_BUILD_PROFILE=hadoop1.0,label=spark-test/95/testReport/junit/org.apache.spark.streaming.util/BatchedWriteAheadLogWithCloseFileAfterWriteSuite/BatchedWriteAheadLog___clean_old_logs/

The reason the test fails is in `afterEach`, `writeAheadLog.close` is called, 
and there may still be async deletes in flight.

tdas zsxwing

Author: Burak Yavuz 

Closes #9953 from brkyvz/flaky-ss.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a5d98876
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a5d98876
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a5d98876

Branch: refs/heads/master
Commit: a5d988763319f63a8e2b58673dd4f9098f17c835
Parents: 4d6bbbc
Author: Burak Yavuz 
Authored: Tue Nov 24 20:58:47 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Nov 24 20:58:47 2015 -0800

--
 .../streaming/util/FileBasedWriteAheadLog.scala | 16 +++-
 1 file changed, 11 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a5d98876/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 72705f1..f5165f7 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.streaming.util
 
 import java.nio.ByteBuffer
-import java.util.concurrent.ThreadPoolExecutor
+import java.util.concurrent.{RejectedExecutionException, ThreadPoolExecutor}
 import java.util.{Iterator => JIterator}
 
 import scala.collection.JavaConverters._
@@ -176,10 +176,16 @@ private[streaming] class FileBasedWriteAheadLog(
 }
 oldLogFiles.foreach { logInfo =>
   if (!executionContext.isShutdown) {
-val f = Future { deleteFile(logInfo) }(executionContext)
-if (waitForCompletion) {
-  import scala.concurrent.duration._
-  Await.ready(f, 1 second)
+try {
+  val f = Future { deleteFile(logInfo) }(executionContext)
+  if (waitForCompletion) {
+import scala.concurrent.duration._
+Await.ready(f, 1 second)
+  }
+} catch {
+  case e: RejectedExecutionException =>
+logWarning("Execution context shutdown before deleting old 
WriteAheadLogs. " +
+  "This would not affect recovery correctness.", e)
 }
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [STREAMING][FLAKY-TEST] Catch execution context race condition in `FileBasedWriteAheadLog.close()`

2015-11-24 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 862d788fc -> b18112666


[STREAMING][FLAKY-TEST] Catch execution context race condition in 
`FileBasedWriteAheadLog.close()`

There is a race condition in `FileBasedWriteAheadLog.close()`, where if 
delete's of old log files are in progress, the write ahead log may close, and 
result in a `RejectedExecutionException`. This is okay, and should be handled 
gracefully.

Example test failures:
https://amplab.cs.berkeley.edu/jenkins/job/Spark-1.6-SBT/AMPLAB_JENKINS_BUILD_PROFILE=hadoop1.0,label=spark-test/95/testReport/junit/org.apache.spark.streaming.util/BatchedWriteAheadLogWithCloseFileAfterWriteSuite/BatchedWriteAheadLog___clean_old_logs/

The reason the test fails is in `afterEach`, `writeAheadLog.close` is called, 
and there may still be async deletes in flight.

tdas zsxwing

Author: Burak Yavuz 

Closes #9953 from brkyvz/flaky-ss.

(cherry picked from commit a5d988763319f63a8e2b58673dd4f9098f17c835)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b1811266
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b1811266
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b1811266

Branch: refs/heads/branch-1.6
Commit: b18112666adec0a942d1cfe8d6b9f1e7c7201fcd
Parents: 862d788
Author: Burak Yavuz 
Authored: Tue Nov 24 20:58:47 2015 -0800
Committer: Shixiong Zhu 
Committed: Tue Nov 24 20:59:01 2015 -0800

--
 .../streaming/util/FileBasedWriteAheadLog.scala | 16 +++-
 1 file changed, 11 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b1811266/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
--
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index 72705f1..f5165f7 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -17,7 +17,7 @@
 package org.apache.spark.streaming.util
 
 import java.nio.ByteBuffer
-import java.util.concurrent.ThreadPoolExecutor
+import java.util.concurrent.{RejectedExecutionException, ThreadPoolExecutor}
 import java.util.{Iterator => JIterator}
 
 import scala.collection.JavaConverters._
@@ -176,10 +176,16 @@ private[streaming] class FileBasedWriteAheadLog(
 }
 oldLogFiles.foreach { logInfo =>
   if (!executionContext.isShutdown) {
-val f = Future { deleteFile(logInfo) }(executionContext)
-if (waitForCompletion) {
-  import scala.concurrent.duration._
-  Await.ready(f, 1 second)
+try {
+  val f = Future { deleteFile(logInfo) }(executionContext)
+  if (waitForCompletion) {
+import scala.concurrent.duration._
+Await.ready(f, 1 second)
+  }
+} catch {
+  case e: RejectedExecutionException =>
+logWarning("Execution context shutdown before deleting old 
WriteAheadLogs. " +
+  "This would not affect recovery correctness.", e)
 }
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org