GitHub user szhem opened a pull request:

    https://github.com/apache/spark/pull/23083

    [SPARK-26114][CORE] ExternalSorter Leak

    ## What changes were proposed in this pull request?
    
    This pull request fixes 
[SPARK-26114](https://issues.apache.org/jira/browse/SPARK-26114) issue that 
occurs when trying to reduce the number of partitions by means of coalesce 
without shuffling after shuffle-based transformations.
    
    For the following data
    ```scala
    import org.apache.hadoop.io._ 
    import org.apache.hadoop.io.compress._ 
    import org.apache.commons.lang._ 
    import org.apache.spark._ 
    
    // generate 100M records of sample data 
    sc.makeRDD(1 to 1000, 1000) 
      .flatMap(item => (1 to 100000) 
        .map(i => new Text(RandomStringUtils.randomAlphanumeric(3).toLowerCase) 
-> new Text(RandomStringUtils.randomAlphanumeric(1024)))) 
      .saveAsSequenceFile("/tmp/random-strings", Some(classOf[GzipCodec])) 
    ```
    
    and the following job
    ```scala
    import org.apache.hadoop.io._
    import org.apache.spark._
    import org.apache.spark.storage._
    
    val rdd = sc.sequenceFile("/tmp/random-strings", classOf[Text], 
classOf[Text])
    rdd 
      .map(item => item._1.toString -> item._2.toString) 
      .repartitionAndSortWithinPartitions(new HashPartitioner(1000)) 
      .coalesce(10,false) 
      .count 
    ```
    
    ... executed like the following
    ```bash
    spark-shell \ 
      --num-executors=5 \ 
      --executor-cores=2 \ 
      --master=yarn \
      --deploy-mode=client \ 
      --conf spark.executor.memory=1g \ 
      --conf spark.dynamicAllocation.enabled=false \
      --conf spark.executor.extraJavaOptions='-XX:+HeapDumpOnOutOfMemoryError 
-XX:HeapDumpPath=/tmp -Dio.netty.noUnsafe=true'
    ```
    
    ... executors are always failing with OutOfMemoryErrors.
    
    The main issue is multiple leaks of ExternalSorter references.
    For example, in case of 2 tasks per executor it is expected to be 2 
simultaneous instances of ExternalSorter per executor but heap dump generated 
on OutOfMemoryError shows that there are more ones.
    
    
![run1-noparams-dominator-tree-externalsorter](https://user-images.githubusercontent.com/1523889/48703665-782ce580-ec05-11e8-95a9-d6c94e8285ab.png)
    
    
    P.S. This PR does not cover cases with CoGroupedRDDs which use 
ExternalAppendOnlyMap internally, which itself can lead to OutOfMemoryErrors in 
many places. 
    
    ## How was this patch tested?
    
    - Existing unit tests
    - New unit tests
    - Job executions on the live environment
    
    Here is the screenshot before applying this patch
    
![run3-noparams-failure-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700395-f769eb80-ebfc-11e8-831b-e94c757d416c.png)
    
    Here is the screenshot after applying this patch
    
![run3-noparams-success-ui-5x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700610-7a8b4180-ebfd-11e8-9761-baaf38a58e66.png)
    And in case of reducing the number of executors even more the job is still 
stable
    
![run3-noparams-success-ui-2x2-repartition-and-sort](https://user-images.githubusercontent.com/1523889/48700619-82e37c80-ebfd-11e8-98ed-a38e1f1f1fd9.png)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/szhem/spark SPARK-26114-externalsorter-leak

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/23083.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #23083
    
----
commit 9ca5ddbeb2022259ec79e60b3e81332466947d90
Author: Sergey Zhemzhitsky <szhemzhitski@...>
Date:   2018-11-05T22:31:59Z

    Allow memory consumers and spillables to be optionally unregistered and not 
trackable on freeing up the memory

commit bf57de2c9d23c3cbac23817a3d0b6158739ae8aa
Author: Sergey Zhemzhitsky <szhemzhitski@...>
Date:   2018-11-05T22:34:15Z

    allow to remove already registered task completion listeners if they dont 
need to be fired at task completion

commit e3531acf2751d4ba63e7fa353b66c70d534271df
Author: Sergey Zhemzhitsky <szhemzhitski@...>
Date:   2018-11-05T22:35:31Z

    clean up readingIterator on stop

commit baa9656e9d9fa2d006ed6fb98257a213bcace588
Author: Sergey Zhemzhitsky <szhemzhitski@...>
Date:   2018-11-05T22:37:07Z

    prevent capturing and holding sub-iterators after completion

commit 3cc54522545f43e0ee9ff9443ba3ea67e5fb9d5b
Author: Sergey Zhemzhitsky <szhemzhitski@...>
Date:   2018-11-05T22:40:13Z

    cleaning up resourses as soon as they no longer needed, dont waiting till 
the end of the task

commit d36323e7db3d8dc5ec01a2ae46752342ff01fac5
Author: Sergey Zhemzhitsky <szhemzhitski@...>
Date:   2018-11-18T01:01:15Z

    adding some unit tests

commit 12075ec265f0d09cd52865bb91155898b9ede523
Author: Sergey Zhemzhitsky <szhemzhitski@...>
Date:   2018-11-18T07:07:27Z

    improved docs

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to