[ 
https://issues.apache.org/jira/browse/SPARK-27734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16893018#comment-16893018
 ] 

Paweł Wiejacha commented on SPARK-27734:
----------------------------------------

We also encountered this problem. We've reduced the problem to shuffling 60 GiB 
of data divided into 5 partitions using *repartitionAndSortWithinPartitions*() 
and processing (*foreachPartition*()) all of them using a single executor that 
has 2 GiB of memory assigned. Processing each partition takes ~70 minutes (52 
min GC time) and CPU usage is very high (due to GC).

Setting *spark.shuffle.spill.numElementsForceSpillThreshold* is very 
inconvenient, so it would be nice to accept Adrian's pull request.

> Add memory based thresholds for shuffle spill
> ---------------------------------------------
>
>                 Key: SPARK-27734
>                 URL: https://issues.apache.org/jira/browse/SPARK-27734
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core, SQL
>    Affects Versions: 3.0.0
>            Reporter: Adrian Muraru
>            Priority: Minor
>
> When running large shuffles (700TB input data, 200k map tasks, 50k reducers 
> on a 300 nodes cluster) the job is regularly OOMing in map and reduce phase.
> IIUC ShuffleExternalSorter (map side) and ExternalAppendOnlyMap and 
> ExternalSorter (reduce side) are trying to max out the available execution 
> memory. This in turn doesn't play nice with the Garbage Collector and 
> executors are failing with OutOfMemoryError when the memory allocation from 
> these in-memory structure is maxing out the available heap size (in our case 
> we are running with 9 cores/executor, 32G per executor)
> To mitigate this, I set 
> {{spark.shuffle.spill.numElementsForceSpillThreshold}} to force the spill on 
> disk. While this config works, it is not flexible enough as it's expressed in 
> number of elements, and in our case we run multiple shuffles in a single job 
> and element size is different from one stage to another.
> We have an internal patch to extend this behaviour and add two new parameters 
> to control the spill based on memory usage:
> - spark.shuffle.spill.map.maxRecordsSizeForSpillThreshold
> - spark.shuffle.spill.reduce.maxRecordsSizeForSpillThreshold
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to