[jira] [Commented] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16080351#comment-16080351 ] Wenchen Fan commented on SPARK-19659: - I think spark 1.6 client can talk to spark 2.2 shuffle service, but spark 2.2 client can not talk to spark 1.6 shuffle service, so this feature is disabled by default for safety. cc [~jinxing6...@126.com] to confirm. > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing >Assignee: jin xing > Fix For: 2.2.0 > > Attachments: SPARK-19659-design-v1.pdf, SPARK-19659-design-v2.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16080340#comment-16080340 ] Thomas Graves commented on SPARK-19659: --- can you be more specific here, do you mean that if you want to use this feature you have to use the shuffle service from 2.2.0? Is that shuffle service compatible with older versions of Spark? ie spark 1.6 can still talk to 2.2.0 shuffle service. > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing >Assignee: jin xing > Fix For: 2.2.0 > > Attachments: SPARK-19659-design-v1.pdf, SPARK-19659-design-v2.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16069396#comment-16069396 ] Apache Spark commented on SPARK-19659: -- User 'zsxwing' has created a pull request for this issue: https://github.com/apache/spark/pull/18467 > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing >Assignee: jin xing > Fix For: 2.2.0 > > Attachments: SPARK-19659-design-v1.pdf, SPARK-19659-design-v2.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.4.14#64029) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16025900#comment-16025900 ] Apache Spark commented on SPARK-19659: -- User 'cloud-fan' has created a pull request for this issue: https://github.com/apache/spark/pull/18117 > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing >Assignee: jin xing > Fix For: 2.2.0 > > Attachments: SPARK-19659-design-v1.pdf, SPARK-19659-design-v2.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15971025#comment-15971025 ] jin xing commented on SPARK-19659: -- [~cloud_fan] I refined the the pr. In current change, I'd propose: 1. Track average size and also the outliers(which are larger than 2*avgSize) in MapStatus; 2. Request memory from *MemoryManager* before fetch blocks and release the memory to MemoryManager when *ManagedBuffer* is released. 3. Fetch remote blocks to disk when failing acquiring memory from MemoryManager, otherwise fetch to memory. I adjust the default value of "spark.memory.offHeap.size" to be 384m(the same with spark.yarn.executor.memoryOverhead), which will be used to initialize the off heap memory pool. What's more, I think *spark.memory.offHeap.enabled* (which is documented in *configuration.md* as "If true, Spark will attempt to use off-heap memory for certain operations. ") might be confusing. Because no matter it is true or false, remote blocks will be shuffle read to off heap by default. It would great if you could take a rough look at the pr and help comment. Thus I can know if I'm on the right direction :) > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > Attachments: SPARK-19659-design-v1.pdf, SPARK-19659-design-v2.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15965452#comment-15965452 ] jin xing commented on SPARK-19659: -- Yes, I think it's a good idea to leverage memory manager instead of using spark.reducer.maxBytesShuffleToMemory. I will try refine the PR :) > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > Attachments: SPARK-19659-design-v1.pdf, SPARK-19659-design-v2.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15965302#comment-15965302 ] Wenchen Fan commented on SPARK-19659: - Seems your model is, all shuffle blocks should be fetched into disk, but we have a buffer to put some of the shuffle blocks in memory, and the buffer size is controlled by {{spark.reducer.maxBytesShuffleToMemory}}. Overall I think this is a good model, but instead of using {{spark.reducer.maxBytesShuffleToMemory}}, can we leverage memory manager and allocate as much memory as possible for this buffer? > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > Attachments: SPARK-19659-design-v1.pdf, SPARK-19659-design-v2.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964596#comment-15964596 ] jin xing commented on SPARK-19659: -- *bytesShuffleToMemory* is different from *bytesInFlight*. *bytesInFlight* is for only one *ShuffleBlockFetcherIterator* and get decreased when the remote blocks is received. *bytesShuffleToMemory* is for all ShuffleBlockFetcherIterators and get decreased only when reference count of ByteBuf is zero(though the memory maybe still inside cache and not really destroyed). If *spark.reducer.maxReqsInFlight* is only for memory control, I think *spark.reducer.maxBytesShuffleToMemory* is an improvement. In the current PR, I want to simplify the logic and the memory is tracked by *bytesShuffleToMemory* and memory usage is not tracked by MemoryManager. > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > Attachments: SPARK-19659-design-v1.pdf, SPARK-19659-design-v2.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964587#comment-15964587 ] jin xing commented on SPARK-19659: -- [~cloud_fan] Thanks a lot for taking look into this and sorry for late reply. My proposal is as below: 1. Track average size and also the outliers(which are larger than 2*avgSize) in MapStatus; 2. Add bytesShuffleToMemory, which tracks the size of remote blocks shuffled to memory; 3. Add spark.reducer.maxBytesShuffleToMemory, when bytesShuffleToMemory is above this configuration, blocks will shuffle to disk; bytesShuffleToMemory is increased when send fetch request(note that at this point, remote blocks maybe not fetched into memory yet, but we add the max memory to be used in the fetch request) and get decreased when the ByteBuf is released. spark.reducer.maxBytesShuffleToMemory is the max memory to be used for fetching remote blocks across all the ShuffleBlockFetcherIterators(there maybe multiple shuffle-read happening at the same time). When memory usage(indicated by bytesShuffleToMemory) is above spark.reducer.maxBytesShuffleToMemory, shuffle remote blocks to disk instead of memory. > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > Attachments: SPARK-19659-design-v1.pdf, SPARK-19659-design-v2.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964548#comment-15964548 ] Wenchen Fan commented on SPARK-19659: - [~jinxing6...@126.com] can you say more about fetching shuffle blocks into disk? How should users tune the {{spark.reducer.maxBytesShuffleToMemory}} config? Shall we have something similar to {{maxReqsInFlight}} to control how many request we can have to fetch blocks to disk at the same time? > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > Attachments: SPARK-19659-design-v1.pdf, SPARK-19659-design-v2.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964508#comment-15964508 ] jin xing commented on SPARK-19659: -- [~irashid] Tracking memory used by Netty by swapping in our own PooledByteBufAllocator is really a good idea. Memory usage will be increased when allocate byte buffer in PoolByteBufAllocator and get decreased when ByteBuf's reference count is zero. Checking source code of Netty, I found that there is cache inside PoolByteBufAllocator. When memory is released, it will be returned to cache or chunklist, not destroyed necessarily. In my understanding, we can get the memory usage by tracking PooledByteBufAllocator, but the value is not the real footprint.(i.e. when memory is released, it maybe in cache other than destroyed.) > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > Attachments: SPARK-19659-design-v1.pdf, SPARK-19659-design-v2.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964476#comment-15964476 ] Imran Rashid commented on SPARK-19659: -- currently the smallest unit is an entire block. That certainly could be changed, but that is another larger change to the way spark uses netty. bq. If the unit is block, I think it's really hard to avoid OOM entirely, as if the estimated block size is wrong, fetching this block may cause OOM and we can do nothing about it. I don't totally agree with this. Yes, the estimated block size could be wrong. The main problem now, though, is that the error is totally unbounded. I really don't think it should be hard to change spark to provide reasonable bounds on that error. Even if that first step doesn't entirely eliminate the chance of OOM, it would certainly help. And that can continue to be improved upon. > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > Attachments: SPARK-19659-design-v1.pdf, SPARK-19659-design-v2.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953968#comment-15953968 ] Wenchen Fan commented on SPARK-19659: - What's the smallest unit of fetching remote shuffle blocks? If the unit is block, I think it's really hard to avoid OOM entirely, as if the estimated block size is wrong, fetching this block may cause OOM and we can do nothing about it. (I guess that's why you add {{spark.reducer.maxBytesShuffleToMemory}} in your PR.) If the unit can be smaller like a byte buffer, and we can fully track and control the shuffle fetch memory usage, I think then we can solve the OOM problem pretty good without introducing new config to users. Is it possible to do it with some advanced netty API? > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > Attachments: SPARK-19659-design-v1.pdf, SPARK-19659-design-v2.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15952401#comment-15952401 ] Imran Rashid commented on SPARK-19659: -- [~cloud_fan] The memory is allocated by netty, but its still possible to track it. Netty already lets you *poll* its memory usage, which gives a crude way to track the memory. That is being looked at in SPARK-9103. It already has an old pr which I think was close, but is now out-of-date; [~jsoltren] is looking at bringing it up to date. A bigger change would be to have closer interaction between netty and spark's memory management. Netty actually already has an extension point where spark could do this -- its {{ByteBufAllocator}}. Spark already configures the ByteBufAllocator, though it just chooses one of the existing implementations: https://github.com/apache/spark/blob/master/common/network-common/src/main/java/org/apache/spark/network/server/TransportServer.java#L95 https://github.com/apache/spark/blob/master/common/network-common/src/main/java/org/apache/spark/network/client/TransportClientFactory.java#L107 We could swap in our own ByteBufAllocator -- it could even just extend netty's {{PooledByteBufAllocator}} that we're using now, just track allocations as they happen; or even, check with the MemoryManager on each allocation whether there is memory available, and if not, don't do the fetch yet. Obviously those are bigger changes, but I think it should be possible conceptually. I don't know of any jiras to track that yet. > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > Attachments: SPARK-19659-design-v1.pdf, SPARK-19659-design-v2.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15952303#comment-15952303 ] Wenchen Fan commented on SPARK-19659: - Is it possible to track shuffle fetch memory? I think the memory is allocated at netty side. > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > Attachments: SPARK-19659-design-v1.pdf, SPARK-19659-design-v2.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900741#comment-15900741 ] jin xing commented on SPARK-19659: -- [~irashid] [~rxin] I uploaded SPARK-19659-design-v2.pdf, please take a look. Yes, only outliers should be tracked and MapStatus should stay compact. It is a good idea to track all the sizes that are more than 2x the average. “2x” is a default value, it will be configurable by parameter. We need to collect metrics for stability and performance: - For stability: 1) Blocks which have size between the average and 2x the average are underestimated. So there is a risk that those blocks cannot fit in memory. In this approach, driver should calculate the sum of underestimated sizes. 2) Show block sizes’ distribution of MapStatus, the distribution will between(0~100K, 100K~1M, 1~10M, 10~100M, 100M~1G, 1G~10G), which will help a lot for debugging(e.g. find skew situations). * For performance: 1) Memory(off heap or on heap) used for fetching blocks should be tracked. 2) Fetching blocks to disk will cause performance degradation. Executor should calculate the size of blocks shuffled to disk and the time cost. Yes, Imran, I will definitely break the proposal to smaller pieces(jiras and prs). The metrics part should be done first before other parts proposed here. What's more, why not make 2000 as a configurable parameter, thus user can chose the track the accurate sizes of blocks to some level. > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > Attachments: SPARK-19659-design-v1.pdf, SPARK-19659-design-v2.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15897244#comment-15897244 ] jin xing commented on SPARK-19659: -- [~irashid] Thanks a lot for your comments, really helpful. I got your idea and doing some tests now. I will refine the doc soon this week. > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > Attachments: SPARK-19659-design-v1.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895502#comment-15895502 ] Imran Rashid commented on SPARK-19659: -- I think Reynold has a good point. I really don't like the idea of always have the MapStatus track 2k sizes -- I already have to regularly recommend to users that they bump their partition count > 2k to avoid an OOM from too many CompressedMapStatus. Going over 2k partitions generally gives big memory savings from using HighlyCompressedMapStatus. Your point about deciding how many outlier to track is valid; but I think there are a lot of other options you might consider as well, eg., track all the sizes that are more than 2x the average, or track a few different size buckets, and keep a bit set for each bucket, etc. these should allow the MapStatus to stay very compact, but have bounded error on the size. For implementation, I'd also break your proposal down into smaller pieces. In fact, the three ideas are all useful independently (though they are more robust together). But two larger pieces I see missing: 1) how will we test the changes out? not for correctness, but for performance / stability benefits? 2) are there metrics we should be collecting so we can better answer these questions, that we currently are not answering? eg., the distribution of sizes in MapStatus is not stored anywhere for later analysis (though its not easy to come up with a good way to store them, since there are n^2 sizes in one shuffle); how much memory is used by the network layer; how much error is there in the sizes from the MapStatus, etc. I think some parts can be implemented anyway, behind feature flags (perhaps undocumented), but its something to keep in mind. > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > Attachments: SPARK-19659-design-v1.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895411#comment-15895411 ] jin xing commented on SPARK-19659: -- [~rxin] Thanks a lot for comment. Tracking average size and also the outliers is a good idea. But there can be multiple huge blocks creating too much pressure(e.g. there are 10% blocks much bigger than they other 90%) and it is a little bit hard to decide how many outliers we should track. If we track too many outliers, *MapStatus* can cost too much memory. I think the benefit of tracking the max for each N/2000 consecutive blocks is that we can avoid having *MapStatus* cost too much memory(at most around 2000Bytes). > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > Attachments: SPARK-19659-design-v1.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15893900#comment-15893900 ] Reynold Xin commented on SPARK-19659: - Rather than this tracking the max for each N/2000 consecutive blocks, why don't we track the average size and also the outliers? It seems like the concern is to avoid a single huge block creating too much pressure, then we just need to know the outliers. > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > Attachments: SPARK-19659-design-v1.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15891999#comment-15891999 ] jin xing commented on SPARK-19659: -- [~rxin] [~davies] [~andrewor14] [~joshrosen] I've uploaded a design doc, It's great if you could help comment on it when have time :) Actually, my data warehouse suffers a lot on this issue. Please take a look if possible. Sorry if this is bothering. > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > Attachments: SPARK-19659-design-v1.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15884789#comment-15884789 ] jin xing commented on SPARK-19659: -- [~irashid] I've uploaded a design doc, please take a look and give your comments when you have time : ) > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > Attachments: SPARK-19659-design-v1.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15878389#comment-15878389 ] jin xing commented on SPARK-19659: -- [~irashid] Thanks a lot for your comments. I will file a design pdf late this week and your concerns will be included. > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15876466#comment-15876466 ] Imran Rashid commented on SPARK-19659: -- [~jinxing6...@126.com] Thanks for taking this on, I think this is a *really* important improvement for Spark -- but its also changing some very core logic which I think needs to be done with a lot of caution. Can you please post a design doc here for discussion? While the heuristics you are proposing seem reasonable, I have a number of concerns: * what about when there are > 2k partitions, and the block size is unknown? especially in the case of skew, this is a huge problem. Perhaps first we should just tackle that problem, to have better size estimations (with bounded error) in that case. * I think it will need to configured independently from maxBytesInFlight * Would it be possible to make the shuffle fetch memory usage get tracked by the memorymanager? That would be another way to avoid OOM. Note this pretty tricky since right now that memory is controlled by netty. * what are the performance ramifications of these changes? What tests are done to understand the effects? I still think that having the shuffle fetch streamed to disk is a good idea, but we should think carefully about the right way to control it, and some of these other ideas should be done first, perhaps. Its at least worth discussing before just doing the implementation. > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15873616#comment-15873616 ] Apache Spark commented on SPARK-19659: -- User 'jinxing64' has created a pull request for this issue: https://github.com/apache/spark/pull/16989 > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org