Repository: spark
Updated Branches:
  refs/heads/master 00c02728a -> f16bc68df


[SPARK-9824] [CORE] Fix the issue that InternalAccumulator leaks WeakReference

`InternalAccumulator.create` doesn't call `registerAccumulatorForCleanup` to 
register itself with ContextCleaner, so `WeakReference`s for these accumulators 
in `Accumulators.originals` won't be removed.

This PR added `registerAccumulatorForCleanup` for internal accumulators to 
avoid the memory leak.

Author: zsxwing <zsxw...@gmail.com>

Closes #8108 from zsxwing/internal-accumulators-leak.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f16bc68d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f16bc68d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f16bc68d

Branch: refs/heads/master
Commit: f16bc68dfb25c7b746ae031a57840ace9bafa87f
Parents: 00c0272
Author: zsxwing <zsxw...@gmail.com>
Authored: Tue Aug 11 14:06:23 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue Aug 11 14:06:23 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/Accumulators.scala   | 22 ++++++++++++--------
 .../org/apache/spark/scheduler/Stage.scala      |  2 +-
 .../org/apache/spark/AccumulatorSuite.scala     |  3 ++-
 3 files changed, 16 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f16bc68d/core/src/main/scala/org/apache/spark/Accumulators.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala 
b/core/src/main/scala/org/apache/spark/Accumulators.scala
index 064246d..c39c866 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -382,14 +382,18 @@ private[spark] object InternalAccumulator {
    * add to the same set of accumulators. We do this to report the 
distribution of accumulator
    * values across all tasks within each stage.
    */
-  def create(): Seq[Accumulator[Long]] = {
-    Seq(
-      // Execution memory refers to the memory used by internal data 
structures created
-      // during shuffles, aggregations and joins. The value of this 
accumulator should be
-      // approximately the sum of the peak sizes across all such data 
structures created
-      // in this task. For SQL jobs, this only tracks all unsafe operators and 
ExternalSort.
-      new Accumulator(
-        0L, AccumulatorParam.LongAccumulatorParam, 
Some(PEAK_EXECUTION_MEMORY), internal = true)
-    ) ++ maybeTestAccumulator.toSeq
+  def create(sc: SparkContext): Seq[Accumulator[Long]] = {
+    val internalAccumulators = Seq(
+        // Execution memory refers to the memory used by internal data 
structures created
+        // during shuffles, aggregations and joins. The value of this 
accumulator should be
+        // approximately the sum of the peak sizes across all such data 
structures created
+        // in this task. For SQL jobs, this only tracks all unsafe operators 
and ExternalSort.
+        new Accumulator(
+          0L, AccumulatorParam.LongAccumulatorParam, 
Some(PEAK_EXECUTION_MEMORY), internal = true)
+      ) ++ maybeTestAccumulator.toSeq
+    internalAccumulators.foreach { accumulator =>
+      sc.cleaner.foreach(_.registerAccumulatorForCleanup(accumulator))
+    }
+    internalAccumulators
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f16bc68d/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala 
b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index de05ee2..1cf0685 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -81,7 +81,7 @@ private[spark] abstract class Stage(
    * accumulators here again will override partial values from the finished 
tasks.
    */
   def resetInternalAccumulators(): Unit = {
-    _internalAccumulators = InternalAccumulator.create()
+    _internalAccumulators = InternalAccumulator.create(rdd.sparkContext)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/f16bc68d/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala 
b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
index 48f5495..0eb2293 100644
--- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala
@@ -160,7 +160,8 @@ class AccumulatorSuite extends SparkFunSuite with Matchers 
with LocalSparkContex
   }
 
   test("internal accumulators in TaskContext") {
-    val accums = InternalAccumulator.create()
+    sc = new SparkContext("local", "test")
+    val accums = InternalAccumulator.create(sc)
     val taskContext = new TaskContextImpl(0, 0, 0, 0, null, null, accums)
     val internalMetricsToAccums = taskContext.internalMetricsToAccumulators
     val collectedInternalAccums = taskContext.collectInternalAccumulators()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to