[jira] [Comment Edited] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964587#comment-15964587 ] jin xing edited comment on SPARK-19659 at 4/11/17 4:13 PM: --- [~cloud_fan] Thanks a lot for taking look into this and sorry for late reply. My proposal is as below: 1. Track average size and also the outliers(which are larger than 2*avgSize) in MapStatus; 2. Add *bytesShuffleToMemory*, which tracks the size of remote blocks shuffled to memory; 3. Add *spark.reducer.maxBytesShuffleToMemory*, when bytesShuffleToMemory is above this configuration, blocks will shuffle to disk; *bytesShuffleToMemory* is increased when send fetch request(note that at this point, remote blocks maybe not fetched into memory yet, but we add the max memory to be used in the fetch request) and get decreased when the ByteBuf is released. *spark.reducer.maxBytesShuffleToMemory* is the max memory to be used for fetching remote blocks across all the ShuffleBlockFetcherIterators(there maybe multiple shuffle-read happening at the same time). When memory usage(indicated by *bytesShuffleToMemory*) is above *spark.reducer.maxBytesShuffleToMemory*, shuffle remote blocks to disk instead of memory. was (Author: jinxing6...@126.com): [~cloud_fan] Thanks a lot for taking look into this and sorry for late reply. My proposal is as below: 1. Track average size and also the outliers(which are larger than 2*avgSize) in MapStatus; 2. Add *bytesShuffleToMemory*, which tracks the size of remote blocks shuffled to memory; 3. Add *spark.reducer.maxBytesShuffleToMemory*, when bytesShuffleToMemory is above this configuration, blocks will shuffle to disk; *bytesShuffleToMemory* is increased when send fetch request(note that at this point, remote blocks maybe not fetched into memory yet, but we add the max memory to be used in the fetch request) and get decreased when the ByteBuf is released. *spark.reducer.maxBytesShuffleToMemory* is the max memory to be used for fetching remote blocks across all the *ShuffleBlockFetcherIterator*s(there maybe multiple shuffle-read happening at the same time). When memory usage(indicated by *bytesShuffleToMemory*) is above *spark.reducer.maxBytesShuffleToMemory*, shuffle remote blocks to disk instead of memory. > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > Attachments: SPARK-19659-design-v1.pdf, SPARK-19659-design-v2.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964587#comment-15964587 ] jin xing edited comment on SPARK-19659 at 4/11/17 4:11 PM: --- [~cloud_fan] Thanks a lot for taking look into this and sorry for late reply. My proposal is as below: 1. Track average size and also the outliers(which are larger than 2*avgSize) in MapStatus; 2. Add *bytesShuffleToMemory*, which tracks the size of remote blocks shuffled to memory; 3. Add *spark.reducer.maxBytesShuffleToMemory*, when bytesShuffleToMemory is above this configuration, blocks will shuffle to disk; *bytesShuffleToMemory* is increased when send fetch request(note that at this point, remote blocks maybe not fetched into memory yet, but we add the max memory to be used in the fetch request) and get decreased when the ByteBuf is released. *spark.reducer.maxBytesShuffleToMemory* is the max memory to be used for fetching remote blocks across all the *ShuffleBlockFetcherIterator*s(there maybe multiple shuffle-read happening at the same time). When memory usage(indicated by *bytesShuffleToMemory*) is above *spark.reducer.maxBytesShuffleToMemory*, shuffle remote blocks to disk instead of memory. was (Author: jinxing6...@126.com): [~cloud_fan] Thanks a lot for taking look into this and sorry for late reply. My proposal is as below: 1. Track average size and also the outliers(which are larger than 2*avgSize) in MapStatus; 2. Add *bytesShuffleToMemory*, which tracks the size of remote blocks shuffled to memory; 3. Add *spark.reducer.maxBytesShuffleToMemory*, when bytesShuffleToMemory is above this configuration, blocks will shuffle to disk; *bytesShuffleToMemory* is increased when send fetch request(note that at this point, remote blocks maybe not fetched into memory yet, but we add the max memory to be used in the fetch request) and get decreased when the ByteBuf is released. *spark.reducer.maxBytesShuffleToMemory* is the max memory to be used for fetching remote blocks across all the ShuffleBlockFetcherIterators(there maybe multiple shuffle-read happening at the same time). When memory usage(indicated by bytesShuffleToMemory) is above spark.reducer.maxBytesShuffleToMemory, shuffle remote blocks to disk instead of memory. > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > Attachments: SPARK-19659-design-v1.pdf, SPARK-19659-design-v2.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15964587#comment-15964587 ] jin xing edited comment on SPARK-19659 at 4/11/17 4:11 PM: --- [~cloud_fan] Thanks a lot for taking look into this and sorry for late reply. My proposal is as below: 1. Track average size and also the outliers(which are larger than 2*avgSize) in MapStatus; 2. Add *bytesShuffleToMemory*, which tracks the size of remote blocks shuffled to memory; 3. Add *spark.reducer.maxBytesShuffleToMemory*, when bytesShuffleToMemory is above this configuration, blocks will shuffle to disk; *bytesShuffleToMemory* is increased when send fetch request(note that at this point, remote blocks maybe not fetched into memory yet, but we add the max memory to be used in the fetch request) and get decreased when the ByteBuf is released. *spark.reducer.maxBytesShuffleToMemory* is the max memory to be used for fetching remote blocks across all the ShuffleBlockFetcherIterators(there maybe multiple shuffle-read happening at the same time). When memory usage(indicated by bytesShuffleToMemory) is above spark.reducer.maxBytesShuffleToMemory, shuffle remote blocks to disk instead of memory. was (Author: jinxing6...@126.com): [~cloud_fan] Thanks a lot for taking look into this and sorry for late reply. My proposal is as below: 1. Track average size and also the outliers(which are larger than 2*avgSize) in MapStatus; 2. Add bytesShuffleToMemory, which tracks the size of remote blocks shuffled to memory; 3. Add spark.reducer.maxBytesShuffleToMemory, when bytesShuffleToMemory is above this configuration, blocks will shuffle to disk; bytesShuffleToMemory is increased when send fetch request(note that at this point, remote blocks maybe not fetched into memory yet, but we add the max memory to be used in the fetch request) and get decreased when the ByteBuf is released. spark.reducer.maxBytesShuffleToMemory is the max memory to be used for fetching remote blocks across all the ShuffleBlockFetcherIterators(there maybe multiple shuffle-read happening at the same time). When memory usage(indicated by bytesShuffleToMemory) is above spark.reducer.maxBytesShuffleToMemory, shuffle remote blocks to disk instead of memory. > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > Attachments: SPARK-19659-design-v1.pdf, SPARK-19659-design-v2.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15891999#comment-15891999 ] jin xing edited comment on SPARK-19659 at 3/19/17 3:16 PM: --- [~rxin] [~davies] [~andrewor14] [~joshrosen] [~kayousterhout] I've uploaded a design doc, It's great if you could help comment on it when have time :) Actually, my data warehouse suffers a lot on this issue. Please take a look if possible. Sorry if this is bothering. was (Author: jinxing6...@126.com): [~rxin] [~davies] [~andrewor14] [~joshrosen] I've uploaded a design doc, It's great if you could help comment on it when have time :) Actually, my data warehouse suffers a lot on this issue. Please take a look if possible. Sorry if this is bothering. > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > Attachments: SPARK-19659-design-v1.pdf, SPARK-19659-design-v2.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15900741#comment-15900741 ] jin xing edited comment on SPARK-19659 at 3/8/17 5:47 AM: -- [~irashid] [~rxin] I uploaded SPARK-19659-design-v2.pdf, please take a look. Yes, only outliers should be tracked and MapStatus should stay compact. It is a good idea to track all the sizes that are more than 2x the average. “2x” is a default value, it will be configurable by parameter. We need to collect metrics for stability and performance: - For stability: 1) Blocks which have size between the average and 2x the average are underestimated. So there is a risk that those blocks cannot fit in memory. In this approach, driver should calculate the sum of underestimated sizes. 2) Show block sizes’ distribution of MapStatus, the distribution will between(0~100K, 100K~1M, 1~10M, 10~100M, 100M~1G, 1G~10G), which will help a lot for debugging(e.g. find skew situations). * For performance: 1) Peak memory(off heap or on heap) used for fetching blocks should be tracked. 2) Fetching blocks to disk will cause performance degradation. Executor should calculate the size of blocks shuffled to disk and the time cost. Yes, Imran, I will definitely break the proposal to smaller pieces(jiras and prs). The metrics part should be done first before other parts proposed here. What's more, why not make 2000 as a configurable parameter, thus user can chose the track the accurate sizes of blocks to some level. was (Author: jinxing6...@126.com): [~irashid] [~rxin] I uploaded SPARK-19659-design-v2.pdf, please take a look. Yes, only outliers should be tracked and MapStatus should stay compact. It is a good idea to track all the sizes that are more than 2x the average. “2x” is a default value, it will be configurable by parameter. We need to collect metrics for stability and performance: - For stability: 1) Blocks which have size between the average and 2x the average are underestimated. So there is a risk that those blocks cannot fit in memory. In this approach, driver should calculate the sum of underestimated sizes. 2) Show block sizes’ distribution of MapStatus, the distribution will between(0~100K, 100K~1M, 1~10M, 10~100M, 100M~1G, 1G~10G), which will help a lot for debugging(e.g. find skew situations). * For performance: 1) Memory(off heap or on heap) used for fetching blocks should be tracked. 2) Fetching blocks to disk will cause performance degradation. Executor should calculate the size of blocks shuffled to disk and the time cost. Yes, Imran, I will definitely break the proposal to smaller pieces(jiras and prs). The metrics part should be done first before other parts proposed here. What's more, why not make 2000 as a configurable parameter, thus user can chose the track the accurate sizes of blocks to some level. > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > Attachments: SPARK-19659-design-v1.pdf, SPARK-19659-design-v2.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15895411#comment-15895411 ] jin xing edited comment on SPARK-19659 at 3/4/17 2:11 AM: -- [~rxin] Thanks a lot for comment. Tracking average size and also the outliers is a good idea. But there can be multiple huge blocks creating too much pressure(e.g. there are 10% blocks much bigger than they other 90%) and it is a little bit hard to decide how many outliers we should track. If we track too many outliers, *MapStatus* can cost too much memory. I think the benefit of tracking the max for each N/2000 consecutive blocks is that we can avoid having *MapStatus* cost too much memory(at most around 2000Bytes after compressing) and we can have all outliers under control. Do you think it's worth trying? was (Author: jinxing6...@126.com): [~rxin] Thanks a lot for comment. Tracking average size and also the outliers is a good idea. But there can be multiple huge blocks creating too much pressure(e.g. there are 10% blocks much bigger than they other 90%) and it is a little bit hard to decide how many outliers we should track. If we track too many outliers, *MapStatus* can cost too much memory. I think the benefit of tracking the max for each N/2000 consecutive blocks is that we can avoid having *MapStatus* cost too much memory(at most around 2000Bytes). > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > Attachments: SPARK-19659-design-v1.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-19659) Fetch big blocks to disk when shuffle-read
[ https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15884789#comment-15884789 ] jin xing edited comment on SPARK-19659 at 3/2/17 10:21 AM: --- [~irashid] I've uploaded a design doc, please take a look and give your comments when you have time : ) Thanks a lot for your previous comments. They are very helpful. It's great if you can help comment more and I can continue working on this :) was (Author: jinxing6...@126.com): [~irashid] I've uploaded a design doc, please take a look and give your comments when you have time : ) > Fetch big blocks to disk when shuffle-read > -- > > Key: SPARK-19659 > URL: https://issues.apache.org/jira/browse/SPARK-19659 > Project: Spark > Issue Type: Improvement > Components: Shuffle >Affects Versions: 2.1.0 >Reporter: jin xing > Attachments: SPARK-19659-design-v1.pdf > > > Currently the whole block is fetched into memory(offheap by default) when > shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can > be large when skew situations. If OOM happens during shuffle read, job will > be killed and users will be notified to "Consider boosting > spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more > memory can resolve the OOM. However the approach is not perfectly suitable > for production environment, especially for data warehouse. > Using Spark SQL as data engine in warehouse, users hope to have a unified > parameter(e.g. memory) but less resource wasted(resource is allocated but not > used), > It's not always easy to predict skew situations, when happen, it make sense > to fetch remote blocks to disk for shuffle-read, rather than > kill the job because of OOM. This approach is mentioned during the discussion > in SPARK-3019, by [~sandyr] and [~mridulm80] -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org