Repository: spark
Updated Branches:
  refs/heads/master 643300a6e -> 2df5f1f00


[SPARK-6075] Fix bug in that caused lost accumulator updates: do not store 
WeakReferences in localAccums map

This fixes a non-deterministic bug introduced in #4021 that could cause tasks' 
accumulator updates to be lost.  The problem is that `localAccums` should not 
hold weak references: after the task finishes running there won't be any strong 
references to these local accumulators, so they can get garbage-collected 
before the executor reads the `localAccums` map.  We don't need weak references 
here anyways, since this map is cleared at the end of each task.

Author: Josh Rosen <joshro...@databricks.com>

Closes #4835 from JoshRosen/SPARK-6075 and squashes the following commits:

4f4b5b2 [Josh Rosen] Remove defensive assertions that caused test failures in 
code unrelated to this change
120c7b0 [Josh Rosen] [SPARK-6075] Do not store WeakReferences in localAccums map


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2df5f1f0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2df5f1f0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2df5f1f0

Branch: refs/heads/master
Commit: 2df5f1f00661cd31b9fc37e80345a3f5f856c95f
Parents: 643300a
Author: Josh Rosen <joshro...@databricks.com>
Authored: Sat Feb 28 22:51:01 2015 -0800
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Sat Feb 28 22:51:01 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/Accumulators.scala   | 40 +++++++++++---------
 1 file changed, 23 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2df5f1f0/core/src/main/scala/org/apache/spark/Accumulators.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala 
b/core/src/main/scala/org/apache/spark/Accumulators.scala
index 30f0ccd..bcf8324 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -280,15 +280,24 @@ object AccumulatorParam {
 
 // TODO: The multi-thread support in accumulators is kind of lame; check
 // if there's a more intuitive way of doing it right
-private[spark] object Accumulators {
-  // Store a WeakReference instead of a StrongReference because this way 
accumulators can be
-  // appropriately garbage collected during long-running jobs and release 
memory
-  type WeakAcc = WeakReference[Accumulable[_, _]]
-  val originals = Map[Long, WeakAcc]()
-  val localAccums = new ThreadLocal[Map[Long, WeakAcc]]() {
-    override protected def initialValue() = Map[Long, WeakAcc]()
+private[spark] object Accumulators extends Logging {
+  /**
+   * This global map holds the original accumulator objects that are created 
on the driver.
+   * It keeps weak references to these objects so that accumulators can be 
garbage-collected
+   * once the RDDs and user-code that reference them are cleaned up.
+   */
+  val originals = Map[Long, WeakReference[Accumulable[_, _]]]()
+
+  /**
+   * This thread-local map holds per-task copies of accumulators; it is used 
to collect the set
+   * of accumulator updates to send back to the driver when tasks complete. 
After tasks complete,
+   * this map is cleared by `Accumulators.clear()` (see Executor.scala).
+   */
+  private val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() {
+    override protected def initialValue() = Map[Long, Accumulable[_, _]]()
   }
-  var lastId: Long = 0
+
+  private var lastId: Long = 0
 
   def newId(): Long = synchronized {
     lastId += 1
@@ -297,16 +306,16 @@ private[spark] object Accumulators {
 
   def register(a: Accumulable[_, _], original: Boolean): Unit = synchronized {
     if (original) {
-      originals(a.id) = new WeakAcc(a)
+      originals(a.id) = new WeakReference[Accumulable[_, _]](a)
     } else {
-      localAccums.get()(a.id) = new WeakAcc(a)
+      localAccums.get()(a.id) = a
     }
   }
 
   // Clear the local (non-original) accumulators for the current thread
   def clear() {
     synchronized {
-      localAccums.get.clear
+      localAccums.get.clear()
     }
   }
 
@@ -320,12 +329,7 @@ private[spark] object Accumulators {
   def values: Map[Long, Any] = synchronized {
     val ret = Map[Long, Any]()
     for ((id, accum) <- localAccums.get) {
-      // Since we are now storing weak references, we must check whether the 
underlying data
-      // is valid.
-      ret(id) = accum.get match {
-        case Some(values) => values.localValue
-        case None => None
-      }
+      ret(id) = accum.localValue
     }
     return ret
   }
@@ -341,6 +345,8 @@ private[spark] object Accumulators {
           case None =>
             throw new IllegalAccessError("Attempted to access garbage 
collected Accumulator.")
         }
+      } else {
+        logWarning(s"Ignoring accumulator update for unknown accumulator id 
$id")
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to