[ 
https://issues.apache.org/jira/browse/SPARK-27876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

feiwang updated SPARK-27876:
----------------------------
    Description: 
There is a limit for shuffle read. 

If a shuffle partition block's size is large than Integer.MaxValue(2GB) and 
this block is fetched from remote, an Exception will be thrown.

{code:java}

    2019-05-24 06:46:30,333 [9935] - WARN 
[shuffle-client-6-2:TransportChannelHandler@78] - Exception in connection from 
hadoop3747.jd.163.org/10.196.76.172:7337
    java.lang.IllegalArgumentException: Too large frame: 2991947178
    at 
org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119)
    at 
org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:133)
    at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:81)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
{code}

Then this task would throw a fetchFailedException.

This task will retry and it would execute successfully only when this task was 
reScheduled to a executor whose host is same to this oversize shuffle partition 
block.

However, if there are more than one oversize(>2GB) shuffle partitions block, 
this task would never execute successfully and it may cause the failure of 
application.

In this PR, I propose a new method to fetch shuffle block, it would fetch multi 
times when the relative shuffle partition block is oversize.

The simple brief introduction:

1. Set a shuffle fetch threshold(SHUFFLE_FETCH_THRESHOLD) to Int.MaxValue(2GB)
2. Set a parameter spark.shuffle.fetch.split to control whether enable fetch 
large partition multi times
3. When creating mapStatus, caucluate the segemens of shuffle block 
(Math.ceil(size /SHUFFLE_FETCH_THRESHOLD )), and only record the segment number 
which is large than 1.
4. Define a new BlockId type, ShuffleBlockSegmentId, used to identifiy the 
fetch method.
5. When spark.shuffle.fetch.split is enabled,  send ShuffleBlockSegmentId 
message to shuffleService instead of ShuffleBlockId message.
6. For a ShuffleBlockId, use a sequence of ManagedBuffers to present its block 
instead of a ManagedBuffer.
7. In ShuffleBlockFetcherIterator,  create a PriorityBlockQueue for a 
ShuffleBlockId to store the fetched SegmentManagedBuffer, when all segments of 
a ShuffleBlockId are fetched,  take  relative sequence of managedBuffers(which 
are ordered by segmentId) as a successResult for a ShuffleBlockID.
8. In the shuffle serivice side, if the blockId of openBlocks is a 
ShuffleBlockSegmentId,  response a segment managedBuffer of block , if the 
blockId is a ShuffleBlockId response a whole managedBuffer of block as before.



  was:
There is a limit for shuffle read. 

If a shuffle partition block's size is large than Integer.MaxValue(2GB) and 
this block is fetched from remote, an Exception will be thrown.

{code:java}

    2019-05-24 06:46:30,333 [9935] - WARN 
[shuffle-client-6-2:TransportChannelHandler@78] - Exception in connection from 
hadoop3747.jd.163.org/10.196.76.172:7337
    java.lang.IllegalArgumentException: Too large frame: 2991947178
    at 
org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119)
    at 
org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:133)
    at 
org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:81)
    at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
{code}

Then this task would throw a fetchFailedException.

This task will retry and it would execute successfully only when this task was 
reScheduled to a executor whose host is same to this oversize shuffle partition 
block.

However, if there are more than one oversize(>2GB) shuffle partitions block, 
this task would never execute successfully and it may cause the failure of 
application.

In this PR, I propose a new method to fetch shuffle block, it would fetch multi 
times when the relative shuffle partition block is oversize.



> [CORE][SHUFFLE] Split large shuffle partition to multi-segments to enable 
> transfer oversize shuffle partition block.
> --------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-27876
>                 URL: https://issues.apache.org/jira/browse/SPARK-27876
>             Project: Spark
>          Issue Type: Improvement
>          Components: Shuffle, Spark Core
>    Affects Versions: 2.4.3, 3.1.0
>            Reporter: feiwang
>            Priority: Major
>
> There is a limit for shuffle read. 
> If a shuffle partition block's size is large than Integer.MaxValue(2GB) and 
> this block is fetched from remote, an Exception will be thrown.
> {code:java}
>     2019-05-24 06:46:30,333 [9935] - WARN 
> [shuffle-client-6-2:TransportChannelHandler@78] - Exception in connection 
> from hadoop3747.jd.163.org/10.196.76.172:7337
>     java.lang.IllegalArgumentException: Too large frame: 2991947178
>     at 
> org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119)
>     at 
> org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:133)
>     at 
> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:81)
>     at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> {code}
> Then this task would throw a fetchFailedException.
> This task will retry and it would execute successfully only when this task 
> was reScheduled to a executor whose host is same to this oversize shuffle 
> partition block.
> However, if there are more than one oversize(>2GB) shuffle partitions block, 
> this task would never execute successfully and it may cause the failure of 
> application.
> In this PR, I propose a new method to fetch shuffle block, it would fetch 
> multi times when the relative shuffle partition block is oversize.
> The simple brief introduction:
> 1. Set a shuffle fetch threshold(SHUFFLE_FETCH_THRESHOLD) to Int.MaxValue(2GB)
> 2. Set a parameter spark.shuffle.fetch.split to control whether enable fetch 
> large partition multi times
> 3. When creating mapStatus, caucluate the segemens of shuffle block 
> (Math.ceil(size /SHUFFLE_FETCH_THRESHOLD )), and only record the segment 
> number which is large than 1.
> 4. Define a new BlockId type, ShuffleBlockSegmentId, used to identifiy the 
> fetch method.
> 5. When spark.shuffle.fetch.split is enabled,  send ShuffleBlockSegmentId 
> message to shuffleService instead of ShuffleBlockId message.
> 6. For a ShuffleBlockId, use a sequence of ManagedBuffers to present its 
> block instead of a ManagedBuffer.
> 7. In ShuffleBlockFetcherIterator,  create a PriorityBlockQueue for a 
> ShuffleBlockId to store the fetched SegmentManagedBuffer, when all segments 
> of a ShuffleBlockId are fetched,  take  relative sequence of 
> managedBuffers(which are ordered by segmentId) as a successResult for a 
> ShuffleBlockID.
> 8. In the shuffle serivice side, if the blockId of openBlocks is a 
> ShuffleBlockSegmentId,  response a segment managedBuffer of block , if the 
> blockId is a ShuffleBlockId response a whole managedBuffer of block as before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to