[SPARK-6132] ContextCleaner race condition across SparkContexts The problem is that `ContextCleaner` may clean variables that belong to a different `SparkContext`. This can happen if the `SparkContext` to which the cleaner belongs stops, and a new one is started immediately afterwards in the same JVM. In this case, if the cleaner is in the middle of cleaning a broadcast, for instance, it will do so through `SparkEnv.get.blockManager`, which could be one that belongs to a different `SparkContext`.
JoshRosen and I suspect that this is the cause of many flaky tests, most notably the `JavaAPISuite`. We were able to reproduce the failure locally (though it is not deterministic and very hard to reproduce). Author: Andrew Or <and...@databricks.com> Closes #4869 from andrewor14/cleaner-masquerade and squashes the following commits: 29168c0 [Andrew Or] Synchronize ContextCleaner stop Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/06d883c3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/06d883c3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/06d883c3 Branch: refs/heads/branch-1.2 Commit: 06d883c3735986c88585565d4e66a5231431a4b8 Parents: a2a94a1 Author: Andrew Or <and...@databricks.com> Authored: Tue Mar 3 13:44:05 2015 -0800 Committer: Sean Owen <so...@cloudera.com> Committed: Sun Mar 22 13:05:11 2015 +0000 ---------------------------------------------------------------------- .../scala/org/apache/spark/ContextCleaner.scala | 35 ++++++++++++++------ 1 file changed, 24 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/06d883c3/core/src/main/scala/org/apache/spark/ContextCleaner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ContextCleaner.scala b/core/src/main/scala/org/apache/spark/ContextCleaner.scala index ede1e23..201e5ec 100644 --- a/core/src/main/scala/org/apache/spark/ContextCleaner.scala +++ b/core/src/main/scala/org/apache/spark/ContextCleaner.scala @@ -104,9 +104,19 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { cleaningThread.start() } - /** Stop the cleaner. */ + /** + * Stop the cleaning thread and wait until the thread has finished running its current task. + */ def stop() { stopped = true + // Interrupt the cleaning thread, but wait until the current task has finished before + // doing so. This guards against the race condition where a cleaning thread may + // potentially clean similarly named variables created by a different SparkContext, + // resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132). + synchronized { + cleaningThread.interrupt() + } + cleaningThread.join() } /** Register a RDD for cleanup when it is garbage collected. */ @@ -135,16 +145,19 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging { try { val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT)) .map(_.asInstanceOf[CleanupTaskWeakReference]) - reference.map(_.task).foreach { task => - logDebug("Got cleaning task " + task) - referenceBuffer -= reference.get - task match { - case CleanRDD(rddId) => - doCleanupRDD(rddId, blocking = blockOnCleanupTasks) - case CleanShuffle(shuffleId) => - doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks) - case CleanBroadcast(broadcastId) => - doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) + // Synchronize here to avoid being interrupted on stop() + synchronized { + reference.map(_.task).foreach { task => + logDebug("Got cleaning task " + task) + referenceBuffer -= reference.get + task match { + case CleanRDD(rddId) => + doCleanupRDD(rddId, blocking = blockOnCleanupTasks) + case CleanShuffle(shuffleId) => + doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks) + case CleanBroadcast(broadcastId) => + doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) + } } } } catch { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org