Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21369#discussion_r209480323
  
    --- Diff: 
core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 ---
    @@ -414,7 +424,112 @@ class ExternalAppendOnlyMapSuite extends 
SparkFunSuite with LocalSparkContext {
         sc.stop()
       }
     
    -  test("external aggregation updates peak execution memory") {
    +  test("SPARK-22713 spill during iteration leaks internal map") {
    +    val size = 1000
    +    val conf = createSparkConf(loadDefaults = true)
    +    sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
    +    val map = createExternalMap[Int]
    +
    +    map.insertAll((0 until size).iterator.map(i => (i / 10, i)))
    +    assert(map.numSpills == 0, "map was not supposed to spill")
    +
    +    val it = map.iterator
    +    assert(it.isInstanceOf[CompletionIterator[_, _]])
    +    // 
org.apache.spark.util.collection.AppendOnlyMap.destructiveSortedIterator returns
    +    // an instance of an annonymous Iterator class.
    +
    +    val underlyingMapRef = WeakReference(map.currentMap)
    +
    +    {
    +      // direct asserts introduced some macro generated code that held a 
reference to the map
    +      val tmpIsNull = null == underlyingMapRef.get.orNull
    +      assert(!tmpIsNull)
    +    }
    +
    +    val first50Keys = for ( _ <- 0 until 50) yield {
    +      val (k, vs) = it.next
    +      val sortedVs = vs.sorted
    +      assert(sortedVs.seq == (0 until 10).map(10 * k + _))
    +      k
    +    }
    +    assert(map.numSpills == 0)
    +    map.spill(Long.MaxValue, null)
    +    // these asserts try to show that we're no longer holding references 
to the underlying map.
    +    // it'd be nice to use something like
    +    // 
https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala
    +    // (lines 69-89)
    +    // assert(map.currentMap == null)
    +    eventually{
    +      System.gc()
    +      // direct asserts introduced some macro generated code that held a 
reference to the map
    +      val tmpIsNull = null == underlyingMapRef.get.orNull
    +      assert(tmpIsNull)
    +    }
    +
    +
    +    val next50Keys = for ( _ <- 0 until 50) yield {
    +      val (k, vs) = it.next
    +      val sortedVs = vs.sorted
    +      assert(sortedVs.seq == (0 until 10).map(10 * k + _))
    +      k
    +    }
    +    assert(!it.hasNext)
    +    val keys = (first50Keys ++ next50Keys).sorted
    +    assert(keys == (0 until 100))
    +  }
    +
    +  test("drop all references to the underlying map once the iterator is 
exhausted") {
    +    val size = 1000
    +    val conf = createSparkConf(loadDefaults = true)
    +    sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
    +    val map = createExternalMap[Int]
    +
    +    map.insertAll((0 until size).iterator.map(i => (i / 10, i)))
    +    assert(map.numSpills == 0, "map was not supposed to spill")
    +
    +    val underlyingMapRef = WeakReference(map.currentMap)
    +
    +    {
    +      // direct asserts introduced some macro generated code that held a 
reference to the map
    +      val tmpIsNull = null == underlyingMapRef.get.orNull
    +      assert(!tmpIsNull)
    +    }
    +
    +    val it = map.iterator
    +    assert( it.isInstanceOf[CompletionIterator[_, _]])
    +
    +
    +    val keys = it.map{
    +      case (k, vs) =>
    +        val sortedVs = vs.sorted
    +        assert(sortedVs.seq == (0 until 10).map(10 * k + _))
    +        k
    +    }
    +    .toList
    +    .sorted
    +
    +    assert(it.isEmpty)
    +    assert(keys == (0 until 100))
    +
    +    assert(map.numSpills == 0)
    +    // these asserts try to show that we're no longer holding references 
to the underlying map.
    +    // it'd be nice to use something like
    +    // 
https://github.com/scala/scala/blob/2.13.x/test/junit/scala/tools/testing/AssertUtil.scala
    +    // (lines 69-89)
    +    assert(map.currentMap == null)
    +
    +    eventually{
    --- End diff --
    
    ditto


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to