Repository: spark
Updated Branches:
  refs/heads/master 93112e693 -> 438f8fd67


[SPARK-26114][CORE] ExternalSorter's readingIterator field leak

## What changes were proposed in this pull request?

This pull request fixes 
[SPARK-26114](https://issues.apache.org/jira/browse/SPARK-26114) issue that 
occurs when trying to reduce the number of partitions by means of coalesce 
without shuffling after shuffle-based transformations.

The leak occurs because of not cleaning up `ExternalSorter`'s `readingIterator` 
field as it's done for its `map` and `buffer` fields.
Additionally there are changes to the `CompletionIterator` to prevent capturing 
its `sub`-iterator and holding it even after the completion iterator completes. 
It is necessary because in some cases, e.g. in case of standard scala's 
`flatMap` iterator (which is used is `CoalescedRDD`'s `compute` method) the 
next value of the main iterator is assigned to `flatMap`'s `cur` field only 
after it is available.
For DAGs where ShuffledRDD is a parent of CoalescedRDD it means that the data 
should be fetched from the map-side of the shuffle, but the process of fetching 
this data consumes quite a lot of memory in addition to the memory already 
consumed by the iterator held by `flatMap`'s `cur` field (until it is 
reassigned).

For the following data
```scala
import org.apache.hadoop.io._
import org.apache.hadoop.io.compress._
import org.apache.commons.lang._
import org.apache.spark._

// generate 100M records of sample data
sc.makeRDD(1 to 1000, 1000)
  .flatMap(item => (1 to 100000)
    .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> 
new Text(RandomStringUtils.randomAlphanumeric(1024))))
  .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec]))
```

and the following job
```scala
import org.apache.hadoop.io._
import org.apache.spark._
import org.apache.spark.storage._

val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text])
rdd
  .map(item => item._1.toString -> item._2.toString)
  .repartitionAndSortWithinPartitions(new HashPartitioner(1000))
  .coalesce(10,false)
  .count
```

... executed like the following
```bash
spark-shell \
  --num-executors=5 \
  --executor-cores=2 \
  --master=yarn \
  --deploy-mode=client \
  --conf spark.executor.memoryOverhead=512 \
  --conf spark.executor.memory=1g \
  --conf spark.dynamicAllocation.enabled=false \
  --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true'
```

... executors are always failing with OutOfMemoryErrors.

The main issue is multiple leaks of ExternalSorter references.
For example, in case of 2 tasks per executor it is expected to be 2 
simultaneous instances of ExternalSorter per executor but heap dump generated 
on OutOfMemoryError shows that there are more ones.

![run1-noparams-dominator-tree-externalsorter](https://user-images.githubusercontent.com/1523889/48703665-782ce580-ec05-11e8-95a9-d6c94e8285ab.png)

P.S. This PR does not cover cases with CoGroupedRDDs which use 
ExternalAppendOnlyMap internally, which itself can lead to OutOfMemoryErrors in 
many places.

## How was this patch tested?

- Existing unit tests
- New unit tests
- Job executions on the live environment

Here is the screenshot before applying this patch
![run3-noparams-failure-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700395-f769eb80-ebfc-11e8-831b-e94c757d416c.png)

Here is the screenshot after applying this patch
![run3-noparams-success-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700610-7a8b4180-ebfd-11e8-9761-baaf38a58e66.png)
And in case of reducing the number of executors even more the job is still 
stable
![run3-noparams-success-ui-2x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700619-82e37c80-ebfd-11e8-98ed-a38e1f1f1fd9.png)

Closes #23083 from szhem/SPARK-26114-externalsorter-leak.

Authored-by: Sergey Zhemzhitsky <szhemzhit...@gmail.com>
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/438f8fd6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/438f8fd6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/438f8fd6

Branch: refs/heads/master
Commit: 438f8fd675d8f819373b6643dea3a77d954b6822
Parents: 93112e6
Author: Sergey Zhemzhitsky <szhemzhit...@gmail.com>
Authored: Wed Nov 28 20:22:24 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Wed Nov 28 20:22:24 2018 +0800

----------------------------------------------------------------------
 .../apache/spark/util/CompletionIterator.scala  |  7 +++++--
 .../spark/util/collection/ExternalSorter.scala  |  3 ++-
 .../spark/util/CompletionIteratorSuite.scala    | 22 ++++++++++++++++++++
 3 files changed, 29 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/438f8fd6/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala 
b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
index 21acaa9..f4d6c7a 100644
--- a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
+++ b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala
@@ -25,11 +25,14 @@ private[spark]
 abstract class CompletionIterator[ +A, +I <: Iterator[A]](sub: I) extends 
Iterator[A] {
 
   private[this] var completed = false
-  def next(): A = sub.next()
+  private[this] var iter = sub
+  def next(): A = iter.next()
   def hasNext: Boolean = {
-    val r = sub.hasNext
+    val r = iter.hasNext
     if (!r && !completed) {
       completed = true
+      // reassign to release resources of highly resource consuming iterators 
early
+      iter = Iterator.empty.asInstanceOf[I]
       completion()
     }
     r

http://git-wip-us.apache.org/repos/asf/spark/blob/438f8fd6/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index eac3db0..46279e7 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -727,9 +727,10 @@ private[spark] class ExternalSorter[K, V, C](
     spills.clear()
     forceSpillFiles.foreach(s => s.file.delete())
     forceSpillFiles.clear()
-    if (map != null || buffer != null) {
+    if (map != null || buffer != null || readingIterator != null) {
       map = null // So that the memory can be garbage-collected
       buffer = null // So that the memory can be garbage-collected
+      readingIterator = null // So that the memory can be garbage-collected
       releaseMemory()
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/438f8fd6/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala 
b/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala
index 688fcd9..29421f7 100644
--- a/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.util
 
+import java.lang.ref.PhantomReference
+import java.lang.ref.ReferenceQueue
+
 import org.apache.spark.SparkFunSuite
 
 class CompletionIteratorSuite extends SparkFunSuite {
@@ -44,4 +47,23 @@ class CompletionIteratorSuite extends SparkFunSuite {
     assert(!completionIter.hasNext)
     assert(numTimesCompleted === 1)
   }
+  test("reference to sub iterator should not be available after completion") {
+    var sub = Iterator(1, 2, 3)
+
+    val refQueue = new ReferenceQueue[Iterator[Int]]
+    val ref = new PhantomReference[Iterator[Int]](sub, refQueue)
+
+    val iter = CompletionIterator[Int, Iterator[Int]](sub, {})
+    sub = null
+    iter.toArray
+
+    for (_ <- 1 to 100 if !ref.isEnqueued) {
+      System.gc()
+      if (!ref.isEnqueued) {
+        Thread.sleep(10)
+      }
+    }
+    assert(ref.isEnqueued)
+    assert(refQueue.poll() === ref)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to