Repository: spark Updated Branches: refs/heads/master 93112e693 -> 438f8fd67
[SPARK-26114][CORE] ExternalSorter's readingIterator field leak ## What changes were proposed in this pull request? This pull request fixes [SPARK-26114](https://issues.apache.org/jira/browse/SPARK-26114) issue that occurs when trying to reduce the number of partitions by means of coalesce without shuffling after shuffle-based transformations. The leak occurs because of not cleaning up `ExternalSorter`'s `readingIterator` field as it's done for its `map` and `buffer` fields. Additionally there are changes to the `CompletionIterator` to prevent capturing its `sub`-iterator and holding it even after the completion iterator completes. It is necessary because in some cases, e.g. in case of standard scala's `flatMap` iterator (which is used is `CoalescedRDD`'s `compute` method) the next value of the main iterator is assigned to `flatMap`'s `cur` field only after it is available. For DAGs where ShuffledRDD is a parent of CoalescedRDD it means that the data should be fetched from the map-side of the shuffle, but the process of fetching this data consumes quite a lot of memory in addition to the memory already consumed by the iterator held by `flatMap`'s `cur` field (until it is reassigned). For the following data ```scala import org.apache.hadoop.io._ import org.apache.hadoop.io.compress._ import org.apache.commons.lang._ import org.apache.spark._ // generate 100M records of sample data sc.makeRDD(1 to 1000, 1000) .flatMap(item => (1 to 100000) .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new Text(RandomStringUtils.randomAlphanumeric(1024)))) .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) ``` and the following job ```scala import org.apache.hadoop.io._ import org.apache.spark._ import org.apache.spark.storage._ val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text]) rdd .map(item => item._1.toString -> item._2.toString) .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) .coalesce(10,false) .count ``` ... executed like the following ```bash spark-shell \ --num-executors=5 \ --executor-cores=2 \ --master=yarn \ --deploy-mode=client \ --conf spark.executor.memoryOverhead=512 \ --conf spark.executor.memory=1g \ --conf spark.dynamicAllocation.enabled=false \ --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true' ``` ... executors are always failing with OutOfMemoryErrors. The main issue is multiple leaks of ExternalSorter references. For example, in case of 2 tasks per executor it is expected to be 2 simultaneous instances of ExternalSorter per executor but heap dump generated on OutOfMemoryError shows that there are more ones. ![run1-noparams-dominator-tree-externalsorter](https://user-images.githubusercontent.com/1523889/48703665-782ce580-ec05-11e8-95a9-d6c94e8285ab.png) P.S. This PR does not cover cases with CoGroupedRDDs which use ExternalAppendOnlyMap internally, which itself can lead to OutOfMemoryErrors in many places. ## How was this patch tested? - Existing unit tests - New unit tests - Job executions on the live environment Here is the screenshot before applying this patch ![run3-noparams-failure-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700395-f769eb80-ebfc-11e8-831b-e94c757d416c.png) Here is the screenshot after applying this patch ![run3-noparams-success-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700610-7a8b4180-ebfd-11e8-9761-baaf38a58e66.png) And in case of reducing the number of executors even more the job is still stable ![run3-noparams-success-ui-2x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700619-82e37c80-ebfd-11e8-98ed-a38e1f1f1fd9.png) Closes #23083 from szhem/SPARK-26114-externalsorter-leak. Authored-by: Sergey Zhemzhitsky <szhemzhit...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/438f8fd6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/438f8fd6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/438f8fd6 Branch: refs/heads/master Commit: 438f8fd675d8f819373b6643dea3a77d954b6822 Parents: 93112e6 Author: Sergey Zhemzhitsky <szhemzhit...@gmail.com> Authored: Wed Nov 28 20:22:24 2018 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Wed Nov 28 20:22:24 2018 +0800 ---------------------------------------------------------------------- .../apache/spark/util/CompletionIterator.scala | 7 +++++-- .../spark/util/collection/ExternalSorter.scala | 3 ++- .../spark/util/CompletionIteratorSuite.scala | 22 ++++++++++++++++++++ 3 files changed, 29 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/438f8fd6/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala index 21acaa9..f4d6c7a 100644 --- a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala +++ b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala @@ -25,11 +25,14 @@ private[spark] abstract class CompletionIterator[ +A, +I <: Iterator[A]](sub: I) extends Iterator[A] { private[this] var completed = false - def next(): A = sub.next() + private[this] var iter = sub + def next(): A = iter.next() def hasNext: Boolean = { - val r = sub.hasNext + val r = iter.hasNext if (!r && !completed) { completed = true + // reassign to release resources of highly resource consuming iterators early + iter = Iterator.empty.asInstanceOf[I] completion() } r http://git-wip-us.apache.org/repos/asf/spark/blob/438f8fd6/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index eac3db0..46279e7 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -727,9 +727,10 @@ private[spark] class ExternalSorter[K, V, C]( spills.clear() forceSpillFiles.foreach(s => s.file.delete()) forceSpillFiles.clear() - if (map != null || buffer != null) { + if (map != null || buffer != null || readingIterator != null) { map = null // So that the memory can be garbage-collected buffer = null // So that the memory can be garbage-collected + readingIterator = null // So that the memory can be garbage-collected releaseMemory() } } http://git-wip-us.apache.org/repos/asf/spark/blob/438f8fd6/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala b/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala index 688fcd9..29421f7 100644 --- a/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/CompletionIteratorSuite.scala @@ -17,6 +17,9 @@ package org.apache.spark.util +import java.lang.ref.PhantomReference +import java.lang.ref.ReferenceQueue + import org.apache.spark.SparkFunSuite class CompletionIteratorSuite extends SparkFunSuite { @@ -44,4 +47,23 @@ class CompletionIteratorSuite extends SparkFunSuite { assert(!completionIter.hasNext) assert(numTimesCompleted === 1) } + test("reference to sub iterator should not be available after completion") { + var sub = Iterator(1, 2, 3) + + val refQueue = new ReferenceQueue[Iterator[Int]] + val ref = new PhantomReference[Iterator[Int]](sub, refQueue) + + val iter = CompletionIterator[Int, Iterator[Int]](sub, {}) + sub = null + iter.toArray + + for (_ <- 1 to 100 if !ref.isEnqueued) { + System.gc() + if (!ref.isEnqueued) { + Thread.sleep(10) + } + } + assert(ref.isEnqueued) + assert(refQueue.poll() === ref) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org