[ 
https://issues.apache.org/jira/browse/SPARK-19659?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15964587#comment-15964587
 ] 

jin xing edited comment on SPARK-19659 at 4/11/17 4:11 PM:
-----------------------------------------------------------

[~cloud_fan]
Thanks a lot for taking look into this and sorry for late reply.

My proposal is as below:
1. Track average size and also the outliers(which are larger than 2*avgSize) in 
MapStatus;
2. Add *bytesShuffleToMemory*, which tracks the size of remote blocks shuffled 
to memory;
3. Add *spark.reducer.maxBytesShuffleToMemory*, when bytesShuffleToMemory is 
above this configuration, blocks will shuffle to disk;

*bytesShuffleToMemory* is increased when send fetch request(note that at this 
point, remote blocks maybe not fetched into memory yet, but we add the max 
memory to be used in the fetch request) and get decreased when the ByteBuf is 
released.

*spark.reducer.maxBytesShuffleToMemory* is the max memory to be used for 
fetching remote blocks across all the ShuffleBlockFetcherIterators(there maybe 
multiple shuffle-read happening at the same time). When memory usage(indicated 
by bytesShuffleToMemory) is above spark.reducer.maxBytesShuffleToMemory, 
shuffle remote blocks to disk instead of memory.



was (Author: jinxing6...@126.com):
[~cloud_fan]
Thanks a lot for taking look into this and sorry for late reply.

My proposal is as below:
1. Track average size and also the outliers(which are larger than 2*avgSize) in 
MapStatus;
2. Add bytesShuffleToMemory, which tracks the size of remote blocks shuffled to 
memory;
3. Add spark.reducer.maxBytesShuffleToMemory, when bytesShuffleToMemory is 
above this configuration, blocks will shuffle to disk;

bytesShuffleToMemory is increased when send fetch request(note that at this 
point, remote blocks maybe not fetched into memory yet, but we add the max 
memory to be used in the fetch request) and get decreased when the ByteBuf is 
released.

spark.reducer.maxBytesShuffleToMemory is the max memory to be used for fetching 
remote blocks across all the ShuffleBlockFetcherIterators(there maybe multiple 
shuffle-read happening at the same time). When memory usage(indicated by 
bytesShuffleToMemory) is above spark.reducer.maxBytesShuffleToMemory, shuffle 
remote blocks to disk instead of memory.


> Fetch big blocks to disk when shuffle-read
> ------------------------------------------
>
>                 Key: SPARK-19659
>                 URL: https://issues.apache.org/jira/browse/SPARK-19659
>             Project: Spark
>          Issue Type: Improvement
>          Components: Shuffle
>    Affects Versions: 2.1.0
>            Reporter: jin xing
>         Attachments: SPARK-19659-design-v1.pdf, SPARK-19659-design-v2.pdf
>
>
> Currently the whole block is fetched into memory(offheap by default) when 
> shuffle-read. A block is defined by (shuffleId, mapId, reduceId). Thus it can 
> be large when skew situations. If OOM happens during shuffle read, job will 
> be killed and users will be notified to "Consider boosting 
> spark.yarn.executor.memoryOverhead". Adjusting parameter and allocating more 
> memory can resolve the OOM. However the approach is not perfectly suitable 
> for production environment, especially for data warehouse.
> Using Spark SQL as data engine in warehouse, users hope to have a unified 
> parameter(e.g. memory) but less resource wasted(resource is allocated but not 
> used),
> It's not always easy to predict skew situations, when happen, it make sense 
> to fetch remote blocks to disk for shuffle-read, rather than
> kill the job because of OOM. This approach is mentioned during the discussion 
> in SPARK-3019, by [~sandyr] and [~mridulm80]



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to