Repository: spark
Updated Branches:
  refs/heads/master 26775e3c8 -> 2e3abdff2


[SPARK-22713][CORE] ExternalAppendOnlyMap leaks when spilled during iteration

## What changes were proposed in this pull request?
This PR solves [SPARK-22713](https://issues.apache.org/jira/browse/SPARK-22713) 
which describes a memory leak that occurs when and ExternalAppendOnlyMap is 
spilled during iteration (opposed to  insertion).

(Please fill in changes proposed in this fix)
ExternalAppendOnlyMap's iterator supports spilling but it kept a reference to 
the internal map (via an internal iterator) after spilling, it seems that the 
original code was actually supposed to 'get rid' of this reference on the next 
iteration but according to the elaborate investigation described in the JIRA 
this didn't happen.
the fix was simply replacing the internal iterator immediately after spilling.

## How was this patch tested?
I've introduced a new test to test suite ExternalAppendOnlyMapSuite, this test 
asserts that neither the external map itself nor its iterator hold any 
reference to the internal map after a spill.
These approach required some access relaxation of some members variables and 
nested classes of ExternalAppendOnlyMap, this members are now package provate 
and annotated with VisibleForTesting.

Closes #21369 from eyalfa/SPARK-22713__ExternalAppendOnlyMap_effective_spill.

Authored-by: Eyal Farago <e...@nrgene.com>
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2e3abdff
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2e3abdff
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2e3abdff

Branch: refs/heads/master
Commit: 2e3abdff23a0725b80992cc30dba2ecf9c2e7fd3
Parents: 26775e3
Author: Eyal Farago <e...@nrgene.com>
Authored: Mon Aug 13 20:55:46 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Mon Aug 13 20:55:46 2018 +0800

----------------------------------------------------------------------
 .../util/collection/ExternalAppendOnlyMap.scala |  35 +++---
 .../collection/ExternalAppendOnlyMapSuite.scala | 119 ++++++++++++++++++-
 2 files changed, 138 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2e3abdff/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index d83da0d..19ff109 100644
--- 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -80,7 +80,10 @@ class ExternalAppendOnlyMap[K, V, C](
     this(createCombiner, mergeValue, mergeCombiners, serializer, blockManager, 
TaskContext.get())
   }
 
-  @volatile private var currentMap = new SizeTrackingAppendOnlyMap[K, C]
+  /**
+   * Exposed for testing
+   */
+  @volatile private[collection] var currentMap = new 
SizeTrackingAppendOnlyMap[K, C]
   private val spilledMaps = new ArrayBuffer[DiskMapIterator]
   private val sparkConf = SparkEnv.get.conf
   private val diskBlockManager = blockManager.diskBlockManager
@@ -267,7 +270,7 @@ class ExternalAppendOnlyMap[K, V, C](
    */
   def destructiveIterator(inMemoryIterator: Iterator[(K, C)]): Iterator[(K, 
C)] = {
     readingIterator = new SpillableIterator(inMemoryIterator)
-    readingIterator
+    readingIterator.toCompletionIterator
   }
 
   /**
@@ -280,8 +283,7 @@ class ExternalAppendOnlyMap[K, V, C](
         "ExternalAppendOnlyMap.iterator is destructive and should only be 
called once.")
     }
     if (spilledMaps.isEmpty) {
-      CompletionIterator[(K, C), Iterator[(K, C)]](
-        destructiveIterator(currentMap.iterator), freeCurrentMap())
+      destructiveIterator(currentMap.iterator)
     } else {
       new ExternalIterator()
     }
@@ -305,8 +307,8 @@ class ExternalAppendOnlyMap[K, V, C](
 
     // Input streams are derived both from the in-memory map and spilled maps 
on disk
     // The in-memory map is sorted in place, while the spilled maps are 
already in sorted order
-    private val sortedMap = CompletionIterator[(K, C), Iterator[(K, 
C)]](destructiveIterator(
-      currentMap.destructiveSortedIterator(keyComparator)), freeCurrentMap())
+    private val sortedMap = destructiveIterator(
+      currentMap.destructiveSortedIterator(keyComparator))
     private val inputStreams = (Seq(sortedMap) ++ spilledMaps).map(it => 
it.buffered)
 
     inputStreams.foreach { it =>
@@ -568,13 +570,11 @@ class ExternalAppendOnlyMap[K, V, C](
     context.addTaskCompletionListener[Unit](context => cleanup())
   }
 
-  private[this] class SpillableIterator(var upstream: Iterator[(K, C)])
+  private class SpillableIterator(var upstream: Iterator[(K, C)])
     extends Iterator[(K, C)] {
 
     private val SPILL_LOCK = new Object()
 
-    private var nextUpstream: Iterator[(K, C)] = null
-
     private var cur: (K, C) = readNext()
 
     private var hasSpilled: Boolean = false
@@ -585,17 +585,24 @@ class ExternalAppendOnlyMap[K, V, C](
       } else {
         logInfo(s"Task ${context.taskAttemptId} force spilling in-memory map 
to disk and " +
           s"it will release 
${org.apache.spark.util.Utils.bytesToString(getUsed())} memory")
-        nextUpstream = spillMemoryIteratorToDisk(upstream)
+        val nextUpstream = spillMemoryIteratorToDisk(upstream)
+        assert(!upstream.hasNext)
         hasSpilled = true
+        upstream = nextUpstream
         true
       }
     }
 
+    private def destroy(): Unit = {
+      freeCurrentMap()
+      upstream = Iterator.empty
+    }
+
+    def toCompletionIterator: CompletionIterator[(K, C), SpillableIterator] = {
+      CompletionIterator[(K, C), SpillableIterator](this, this.destroy)
+    }
+
     def readNext(): (K, C) = SPILL_LOCK.synchronized {
-      if (nextUpstream != null) {
-        upstream = nextUpstream
-        nextUpstream = null
-      }
       if (upstream.hasNext) {
         upstream.next()
       } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/2e3abdff/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 35312f2..d542ba0 100644
--- 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -17,14 +17,24 @@
 
 package org.apache.spark.util.collection
 
+import java.util.Objects
+
 import scala.collection.mutable.ArrayBuffer
+import scala.ref.WeakReference
+
+import org.scalatest.Matchers
+import org.scalatest.concurrent.Eventually
 
 import org.apache.spark._
 import org.apache.spark.internal.config._
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.memory.MemoryTestingUtils
+import org.apache.spark.util.CompletionIterator
 
-class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
+class ExternalAppendOnlyMapSuite extends SparkFunSuite
+  with LocalSparkContext
+  with Eventually
+  with Matchers{
   import TestUtils.{assertNotSpilled, assertSpilled}
 
   private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS
@@ -414,7 +424,112 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite 
with LocalSparkContext {
     sc.stop()
   }
 
-  test("external aggregation updates peak execution memory") {
+  test("SPARK-22713 spill during iteration leaks internal map") {
+    val size = 1000
+    val conf = createSparkConf(loadDefaults = true)
+    sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+    val map = createExternalMap[Int]
+
+    map.insertAll((0 until size).iterator.map(i => (i / 10, i)))
+    assert(map.numSpills == 0, "map was not supposed to spill")
+
+    val it = map.iterator
+    assert(it.isInstanceOf[CompletionIterator[_, _]])
+    // 
org.apache.spark.util.collection.AppendOnlyMap.destructiveSortedIterator returns
+    // an instance of an annonymous Iterator class.
+
+    val underlyingMapRef = WeakReference(map.currentMap)
+
+    {
+      // direct asserts introduced some macro generated code that held a 
reference to the map
+      val tmpIsNull = null == underlyingMapRef.get.orNull
+      assert(!tmpIsNull)
+    }
+
+    val first50Keys = for ( _ <- 0 until 50) yield {
+      val (k, vs) = it.next
+      val sortedVs = vs.sorted
+      assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+      k
+    }
+    assert(map.numSpills == 0)
+    map.spill(Long.MaxValue, null)
+    // these asserts try to show that we're no longer holding references to 
the underlying map.
+    // it'd be nice to use something like
+    // 
https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala
+    // (lines 69-89)
+    // assert(map.currentMap == null)
+    eventually {
+      System.gc()
+      // direct asserts introduced some macro generated code that held a 
reference to the map
+      val tmpIsNull = null == underlyingMapRef.get.orNull
+      assert(tmpIsNull)
+    }
+
+
+    val next50Keys = for ( _ <- 0 until 50) yield {
+      val (k, vs) = it.next
+      val sortedVs = vs.sorted
+      assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+      k
+    }
+    assert(!it.hasNext)
+    val keys = (first50Keys ++ next50Keys).sorted
+    assert(keys == (0 until 100))
+  }
+
+  test("drop all references to the underlying map once the iterator is 
exhausted") {
+    val size = 1000
+    val conf = createSparkConf(loadDefaults = true)
+    sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
+    val map = createExternalMap[Int]
+
+    map.insertAll((0 until size).iterator.map(i => (i / 10, i)))
+    assert(map.numSpills == 0, "map was not supposed to spill")
+
+    val underlyingMapRef = WeakReference(map.currentMap)
+
+    {
+      // direct asserts introduced some macro generated code that held a 
reference to the map
+      val tmpIsNull = null == underlyingMapRef.get.orNull
+      assert(!tmpIsNull)
+    }
+
+    val it = map.iterator
+    assert( it.isInstanceOf[CompletionIterator[_, _]])
+
+
+    val keys = it.map{
+      case (k, vs) =>
+        val sortedVs = vs.sorted
+        assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+        k
+    }
+    .toList
+    .sorted
+
+    assert(it.isEmpty)
+    assert(keys == (0 until 100))
+
+    assert(map.numSpills == 0)
+    // these asserts try to show that we're no longer holding references to 
the underlying map.
+    // it'd be nice to use something like
+    // 
https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala
+    // (lines 69-89)
+    assert(map.currentMap == null)
+
+    eventually {
+      Thread.sleep(500)
+      System.gc()
+      // direct asserts introduced some macro generated code that held a 
reference to the map
+      val tmpIsNull = null == underlyingMapRef.get.orNull
+      assert(tmpIsNull)
+    }
+
+    assert(it.toList.isEmpty)
+  }
+
+  test("SPARK-22713 external aggregation updates peak execution memory") {
     val spillThreshold = 1000
     val conf = createSparkConf(loadDefaults = false)
       .set("spark.shuffle.spill.numElementsForceSpillThreshold", 
spillThreshold.toString)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to