[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15895502#comment-15895502 ]
Imran Rashid commented on SPARK-19659: -------------------------------------- I think Reynold has a good point. I really don't like the idea of always have the MapStatus track 2k sizes -- I already have to regularly recommend to users that they bump their partition count > 2k to avoid an OOM from too many CompressedMapStatus. Going over 2k partitions generally gives big memory savings from using HighlyCompressedMapStatus. Your point about deciding how many outlier to track is valid; but I think there are a lot of other options you might consider as well, eg., track all the sizes that are more than 2x the average, or track a few different size buckets, and keep a bit set for each bucket, etc. these should allow the MapStatus to stay very compact, but have bounded error on the size. For implementation, I'd also break your proposal down into smaller pieces. In fact, the three ideas are all useful independently (though they are more robust together). But two larger pieces I see missing: 1) how will we test the changes out? not for correctness, but for performance / stability benefits? 2) are there metrics we should be collecting so we can better answer these questions, that we currently are not answering? eg., the distribution of sizes in MapStatus is not stored anywhere for later analysis (though its not easy to come up with a good way to store them, since there are n^2 sizes in one shuffle); how much memory is used by the network layer; how much error is there in the sizes from the MapStatus, etc. I think some parts can be implemented anyway, behind feature flags (perhaps undocumented), but its something to keep in mind. > Fetch big blocks to disk when shuffle-read > ------------------------------------------ > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle > Affects Versions: 2.1.0 > Reporter: jin xing > Attachments: SPARK-19659-design-v1.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org