[SPARK-6132] ContextCleaner race condition across SparkContexts

The problem is that `ContextCleaner` may clean variables that belong to a 
different `SparkContext`. This can happen if the `SparkContext` to which the 
cleaner belongs stops, and a new one is started immediately afterwards in the 
same JVM. In this case, if the cleaner is in the middle of cleaning a 
broadcast, for instance, it will do so through `SparkEnv.get.blockManager`, 
which could be one that belongs to a different `SparkContext`.

JoshRosen and I suspect that this is the cause of many flaky tests, most 
notably the `JavaAPISuite`. We were able to reproduce the failure locally 
(though it is not deterministic and very hard to reproduce).

Author: Andrew Or <and...@databricks.com>

Closes #4869 from andrewor14/cleaner-masquerade and squashes the following 
commits:

29168c0 [Andrew Or] Synchronize ContextCleaner stop


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/06d883c3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/06d883c3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/06d883c3

Branch: refs/heads/branch-1.2
Commit: 06d883c3735986c88585565d4e66a5231431a4b8
Parents: a2a94a1
Author: Andrew Or <and...@databricks.com>
Authored: Tue Mar 3 13:44:05 2015 -0800
Committer: Sean Owen <so...@cloudera.com>
Committed: Sun Mar 22 13:05:11 2015 +0000

----------------------------------------------------------------------
 .../scala/org/apache/spark/ContextCleaner.scala | 35 ++++++++++++++------
 1 file changed, 24 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/06d883c3/core/src/main/scala/org/apache/spark/ContextCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala 
b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
index ede1e23..201e5ec 100644
--- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala
@@ -104,9 +104,19 @@ private[spark] class ContextCleaner(sc: SparkContext) 
extends Logging {
     cleaningThread.start()
   }
 
-  /** Stop the cleaner. */
+  /**
+   * Stop the cleaning thread and wait until the thread has finished running 
its current task.
+   */
   def stop() {
     stopped = true
+    // Interrupt the cleaning thread, but wait until the current task has 
finished before
+    // doing so. This guards against the race condition where a cleaning 
thread may
+    // potentially clean similarly named variables created by a different 
SparkContext,
+    // resulting in otherwise inexplicable block-not-found exceptions 
(SPARK-6132).
+    synchronized {
+      cleaningThread.interrupt()
+    }
+    cleaningThread.join()
   }
 
   /** Register a RDD for cleanup when it is garbage collected. */
@@ -135,16 +145,19 @@ private[spark] class ContextCleaner(sc: SparkContext) 
extends Logging {
       try {
         val reference = 
Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
           .map(_.asInstanceOf[CleanupTaskWeakReference])
-        reference.map(_.task).foreach { task =>
-          logDebug("Got cleaning task " + task)
-          referenceBuffer -= reference.get
-          task match {
-            case CleanRDD(rddId) =>
-              doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
-            case CleanShuffle(shuffleId) =>
-              doCleanupShuffle(shuffleId, blocking = 
blockOnShuffleCleanupTasks)
-            case CleanBroadcast(broadcastId) =>
-              doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
+        // Synchronize here to avoid being interrupted on stop()
+        synchronized {
+          reference.map(_.task).foreach { task =>
+            logDebug("Got cleaning task " + task)
+            referenceBuffer -= reference.get
+            task match {
+              case CleanRDD(rddId) =>
+                doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
+              case CleanShuffle(shuffleId) =>
+                doCleanupShuffle(shuffleId, blocking = 
blockOnShuffleCleanupTasks)
+              case CleanBroadcast(broadcastId) =>
+                doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
+            }
           }
         }
       } catch {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to