[ https://issues.apache.org/jira/browse/SPARK-27734?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16893018#comment-16893018 ]
Paweł Wiejacha commented on SPARK-27734: ---------------------------------------- We also encountered this problem. We've reduced the problem to shuffling 60 GiB of data divided into 5 partitions using *repartitionAndSortWithinPartitions*() and processing (*foreachPartition*()) all of them using a single executor that has 2 GiB of memory assigned. Processing each partition takes ~70 minutes (52 min GC time) and CPU usage is very high (due to GC). Setting *spark.shuffle.spill.numElementsForceSpillThreshold* is very inconvenient, so it would be nice to accept Adrian's pull request. > Add memory based thresholds for shuffle spill > --------------------------------------------- > > Key: SPARK-27734 > URL: https://issues.apache.org/jira/browse/SPARK-27734 > Project: Spark > Issue Type: Improvement > Components: Spark Core, SQL > Affects Versions: 3.0.0 > Reporter: Adrian Muraru > Priority: Minor > > When running large shuffles (700TB input data, 200k map tasks, 50k reducers > on a 300 nodes cluster) the job is regularly OOMing in map and reduce phase. > IIUC ShuffleExternalSorter (map side) and ExternalAppendOnlyMap and > ExternalSorter (reduce side) are trying to max out the available execution > memory. This in turn doesn't play nice with the Garbage Collector and > executors are failing with OutOfMemoryError when the memory allocation from > these in-memory structure is maxing out the available heap size (in our case > we are running with 9 cores/executor, 32G per executor) > To mitigate this, I set > {{spark.shuffle.spill.numElementsForceSpillThreshold}} to force the spill on > disk. While this config works, it is not flexible enough as it's expressed in > number of elements, and in our case we run multiple shuffles in a single job > and element size is different from one stage to another. > We have an internal patch to extend this behaviour and add two new parameters > to control the spill based on memory usage: > - spark.shuffle.spill.map.maxRecordsSizeForSpillThreshold > - spark.shuffle.spill.reduce.maxRecordsSizeForSpillThreshold > -- This message was sent by Atlassian JIRA (v7.6.14#76016) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org