GitHub user szhem opened a pull request: https://github.com/apache/spark/pull/23083
[SPARK-26114][CORE] ExternalSorter Leak ## What changes were proposed in this pull request? This pull request fixes [SPARK-26114](https://issues.apache.org/jira/browse/SPARK-26114) issue that occurs when trying to reduce the number of partitions by means of coalesce without shuffling after shuffle-based transformations. For the following data ```scala import org.apache.hadoop.io._ import org.apache.hadoop.io.compress._ import org.apache.commons.lang._ import org.apache.spark._ // generate 100M records of sample data sc.makeRDD(1 to 1000, 1000) .flatMap(item => (1 to 100000) .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) -> new Text(RandomStringUtils.randomAlphanumeric(1024)))) .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) ``` and the following job ```scala import org.apache.hadoop.io._ import org.apache.spark._ import org.apache.spark.storage._ val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], classOf[Text]) rdd .map(item => item._1.toString -> item._2.toString) .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) .coalesce(10,false) .count ``` ... executed like the following ```bash spark-shell \ --num-executors=5 \ --executor-cores=2 \ --master=yarn \ --deploy-mode=client \ --conf spark.executor.memory=1g \ --conf spark.dynamicAllocation.enabled=false \ --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true' ``` ... executors are always failing with OutOfMemoryErrors. The main issue is multiple leaks of ExternalSorter references. For example, in case of 2 tasks per executor it is expected to be 2 simultaneous instances of ExternalSorter per executor but heap dump generated on OutOfMemoryError shows that there are more ones. ![run1-noparams-dominator-tree-externalsorter](https://user-images.githubusercontent.com/1523889/48703665-782ce580-ec05-11e8-95a9-d6c94e8285ab.png) P.S. This PR does not cover cases with CoGroupedRDDs which use ExternalAppendOnlyMap internally, which itself can lead to OutOfMemoryErrors in many places. ## How was this patch tested? - Existing unit tests - New unit tests - Job executions on the live environment Here is the screenshot before applying this patch ![run3-noparams-failure-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700395-f769eb80-ebfc-11e8-831b-e94c757d416c.png) Here is the screenshot after applying this patch ![run3-noparams-success-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700610-7a8b4180-ebfd-11e8-9761-baaf38a58e66.png) And in case of reducing the number of executors even more the job is still stable ![run3-noparams-success-ui-2x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700619-82e37c80-ebfd-11e8-98ed-a38e1f1f1fd9.png) You can merge this pull request into a Git repository by running: $ git pull https://github.com/szhem/spark SPARK-26114-externalsorter-leak Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/23083.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #23083 ---- commit 9ca5ddbeb2022259ec79e60b3e81332466947d90 Author: Sergey Zhemzhitsky <szhemzhitski@...> Date: 2018-11-05T22:31:59Z Allow memory consumers and spillables to be optionally unregistered and not trackable on freeing up the memory commit bf57de2c9d23c3cbac23817a3d0b6158739ae8aa Author: Sergey Zhemzhitsky <szhemzhitski@...> Date: 2018-11-05T22:34:15Z allow to remove already registered task completion listeners if they dont need to be fired at task completion commit e3531acf2751d4ba63e7fa353b66c70d534271df Author: Sergey Zhemzhitsky <szhemzhitski@...> Date: 2018-11-05T22:35:31Z clean up readingIterator on stop commit baa9656e9d9fa2d006ed6fb98257a213bcace588 Author: Sergey Zhemzhitsky <szhemzhitski@...> Date: 2018-11-05T22:37:07Z prevent capturing and holding sub-iterators after completion commit 3cc54522545f43e0ee9ff9443ba3ea67e5fb9d5b Author: Sergey Zhemzhitsky <szhemzhitski@...> Date: 2018-11-05T22:40:13Z cleaning up resourses as soon as they no longer needed, dont waiting till the end of the task commit d36323e7db3d8dc5ec01a2ae46752342ff01fac5 Author: Sergey Zhemzhitsky <szhemzhitski@...> Date: 2018-11-18T01:01:15Z adding some unit tests commit 12075ec265f0d09cd52865bb91155898b9ede523 Author: Sergey Zhemzhitsky <szhemzhitski@...> Date: 2018-11-18T07:07:27Z improved docs ---- --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org