Github user szhem commented on a diff in the pull request: https://github.com/apache/spark/pull/23083#discussion_r236057101 --- Diff: core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala --- @@ -727,9 +727,10 @@ private[spark] class ExternalSorter[K, V, C]( spills.clear() forceSpillFiles.foreach(s => s.file.delete()) forceSpillFiles.clear() - if (map != null || buffer != null) { + if (map != null || buffer != null || readingIterator != null) { map = null // So that the memory can be garbage-collected buffer = null // So that the memory can be garbage-collected + readingIterator = null // So that the memory can be garbage-collected --- End diff -- @advancedxy I've tried to remove all the modifications except for this one and got OutOfMemoryErrors once again. Here are the details: 1. Now there are 4 `ExternalSorter` remained 2 of them are not closed ones ... ![1_readingiterator_isnull_nonclosed_externalsorter](https://user-images.githubusercontent.com/1523889/48973288-2218d180-f04d-11e8-9329-27b3edf33c48.png) and 2 of them are closed ones ... ![2_readingiterator_isnull_closed_externalsorter](https://user-images.githubusercontent.com/1523889/48973295-483e7180-f04d-11e8-83cf-23361515363f.png) as expected 2. There are 2 `SpillableIterator`s (which consume a significant part of memory) of already closed `ExternalSorter`s remained ![4_readingiterator_isnull_spillableiterator_of_closed_externalsorter](https://user-images.githubusercontent.com/1523889/48973318-cf8be500-f04d-11e8-912f-74be7420ca95.png) 3. These `SpillableIterator`s are referenced by `CompletionIterator`s ... ![6_completioniterator_of_blockstoreshufflereader](https://user-images.githubusercontent.com/1523889/48973357-a6b81f80-f04e-11e8-810f-dc8941430f34.png) ... which in their order seem to be referenced by the `cur` field ... ![7_coalescedrdd_compute_flatmap](https://user-images.githubusercontent.com/1523889/48973491-7e7df000-f051-11e8-8864-7e9e7f3f994b.png) ... of the standard `Iterator`'s `flatMap` that is used in the `compute` method of `CoalescedRDD` ![image](https://user-images.githubusercontent.com/1523889/48973401-7fae1d80-f04f-11e8-8cf2-043c808173d9.png) Standard `Iterator`'s `flatMap` does not clean up its `cur` field before obtaining the next value for it which in its order will consume quite a lot of memory too ![image](https://user-images.githubusercontent.com/1523889/48973418-dfa4c400-f04f-11e8-8f0e-b464567d43de.png) .. and in case of Spark that means that the previous iterator consuming the memory will live there while fetching the next value for it ![8_coalescedrdd_compute_flatmap_cur_isnotassigned](https://user-images.githubusercontent.com/1523889/48974089-0dddd000-f05f-11e8-8319-f7d1f778f381.png) So I've returned the changes made to the `CompletionIterator` to reassign the reference of its sub-iterator to the `empty` iterator ... ![image](https://user-images.githubusercontent.com/1523889/48973472-27781b00-f051-11e8-86e1-cd6b87cd114b.png) ... and that has helped. P.S. I believe that cleaning up the standard `flatMap`'s iterator `cur` field before calling `nextCur` could help too ```scala def flatMap[B](f: A => GenTraversableOnce[B]): Iterator[B] = new AbstractIterator[B] { private var cur: Iterator[B] = empty private def nextCur() { cur = f(self.next()).toIterator } def hasNext: Boolean = { // Equivalent to cur.hasNext || self.hasNext && { nextCur(); hasNext } // but slightly shorter bytecode (better JVM inlining!) while (!cur.hasNext) { cur = empty if (!self.hasNext) return false nextCur() } true } def next(): B = (if (hasNext) cur else empty).next() } ```
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org