Repository: spark Updated Branches: refs/heads/master 643300a6e -> 2df5f1f00
[SPARK-6075] Fix bug in that caused lost accumulator updates: do not store WeakReferences in localAccums map This fixes a non-deterministic bug introduced in #4021 that could cause tasks' accumulator updates to be lost. The problem is that `localAccums` should not hold weak references: after the task finishes running there won't be any strong references to these local accumulators, so they can get garbage-collected before the executor reads the `localAccums` map. We don't need weak references here anyways, since this map is cleared at the end of each task. Author: Josh Rosen <joshro...@databricks.com> Closes #4835 from JoshRosen/SPARK-6075 and squashes the following commits: 4f4b5b2 [Josh Rosen] Remove defensive assertions that caused test failures in code unrelated to this change 120c7b0 [Josh Rosen] [SPARK-6075] Do not store WeakReferences in localAccums map Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2df5f1f0 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2df5f1f0 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2df5f1f0 Branch: refs/heads/master Commit: 2df5f1f00661cd31b9fc37e80345a3f5f856c95f Parents: 643300a Author: Josh Rosen <joshro...@databricks.com> Authored: Sat Feb 28 22:51:01 2015 -0800 Committer: Josh Rosen <joshro...@databricks.com> Committed: Sat Feb 28 22:51:01 2015 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/Accumulators.scala | 40 +++++++++++--------- 1 file changed, 23 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2df5f1f0/core/src/main/scala/org/apache/spark/Accumulators.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index 30f0ccd..bcf8324 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -280,15 +280,24 @@ object AccumulatorParam { // TODO: The multi-thread support in accumulators is kind of lame; check // if there's a more intuitive way of doing it right -private[spark] object Accumulators { - // Store a WeakReference instead of a StrongReference because this way accumulators can be - // appropriately garbage collected during long-running jobs and release memory - type WeakAcc = WeakReference[Accumulable[_, _]] - val originals = Map[Long, WeakAcc]() - val localAccums = new ThreadLocal[Map[Long, WeakAcc]]() { - override protected def initialValue() = Map[Long, WeakAcc]() +private[spark] object Accumulators extends Logging { + /** + * This global map holds the original accumulator objects that are created on the driver. + * It keeps weak references to these objects so that accumulators can be garbage-collected + * once the RDDs and user-code that reference them are cleaned up. + */ + val originals = Map[Long, WeakReference[Accumulable[_, _]]]() + + /** + * This thread-local map holds per-task copies of accumulators; it is used to collect the set + * of accumulator updates to send back to the driver when tasks complete. After tasks complete, + * this map is cleared by `Accumulators.clear()` (see Executor.scala). + */ + private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() { + override protected def initialValue() = Map[Long, Accumulable[_, _]]() } - var lastId: Long = 0 + + private var lastId: Long = 0 def newId(): Long = synchronized { lastId += 1 @@ -297,16 +306,16 @@ private[spark] object Accumulators { def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized { if (original) { - originals(a.id) = new WeakAcc(a) + originals(a.id) = new WeakReference[Accumulable[_, _]](a) } else { - localAccums.get()(a.id) = new WeakAcc(a) + localAccums.get()(a.id) = a } } // Clear the local (non-original) accumulators for the current thread def clear() { synchronized { - localAccums.get.clear + localAccums.get.clear() } } @@ -320,12 +329,7 @@ private[spark] object Accumulators { def values: Map[Long, Any] = synchronized { val ret = Map[Long, Any]() for ((id, accum) <- localAccums.get) { - // Since we are now storing weak references, we must check whether the underlying data - // is valid. - ret(id) = accum.get match { - case Some(values) => values.localValue - case None => None - } + ret(id) = accum.localValue } return ret } @@ -341,6 +345,8 @@ private[spark] object Accumulators { case None => throw new IllegalAccessError("Attempted to access garbage collected Accumulator.") } + } else { + logWarning(s"Ignoring accumulator update for unknown accumulator id $id") } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org