[ 
https://issues.apache.org/jira/browse/SPARK-27876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

feiwang updated SPARK-27876:
----------------------------
    Affects Version/s:     (was: 3.1.0)
                           (was: 2.4.3)
                       2.3.2

> Split large shuffle partition to multi-segments to enable transfer oversize 
> shuffle partition block.
> ----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-27876
>                 URL: https://issues.apache.org/jira/browse/SPARK-27876
>             Project: Spark
>          Issue Type: Improvement
>          Components: Shuffle, Spark Core
>    Affects Versions: 2.3.2
>            Reporter: feiwang
>            Priority: Major
>
> There is a limit for shuffle read. 
> If a shuffle partition block's size is large than Integer.MaxValue(2GB) and 
> this block is fetched from remote, an Exception will be thrown.
> {code:java}
>     2019-05-24 06:46:30,333 [9935] - WARN 
> [shuffle-client-6-2:TransportChannelHandler@78] - Exception in connection 
> from hadoop3747.jd.163.org/10.196.76.172:7337
>     java.lang.IllegalArgumentException: Too large frame: 2991947178
>     at 
> org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119)
>     at 
> org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:133)
>     at 
> org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:81)
>     at 
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
> {code}
> Then this task would throw a fetchFailedException.
> This task will retry and it would execute successfully only when this task 
> was reScheduled to a executor whose host is same to this oversize shuffle 
> partition block.
> However, if there are more than one oversize(>2GB) shuffle partitions block, 
> this task would never execute successfully and it may cause the failure of 
> application.
> In this PR, I propose a new method to fetch shuffle block, it would fetch 
> multi times when the relative shuffle partition block is oversize.
> The simple brief introduction:
> 1. Set a shuffle fetch threshold(SHUFFLE_FETCH_THRESHOLD) to Int.MaxValue(2GB)
> 2. Set a parameter spark.shuffle.fetch.split to control whether enable fetch 
> large partition multi times
> 3. When creating mapStatus, caucluate the segemens of shuffle block 
> (Math.ceil(size /SHUFFLE_FETCH_THRESHOLD )), and only record the segment 
> number which is large than 1.
> 4. Define a new BlockId type, ShuffleBlockSegmentId, used to identifiy the 
> fetch method.
> 5. When spark.shuffle.fetch.split is enabled,  send ShuffleBlockSegmentId 
> message to shuffleService instead of ShuffleBlockId message.
> 6. For a ShuffleBlockId, use a sequence of ManagedBuffers to present its 
> block instead of a ManagedBuffer.
> 7. In ShuffleBlockFetcherIterator,  create a PriorityBlockQueue for a 
> ShuffleBlockId to store the fetched SegmentManagedBuffer, when all segments 
> of a ShuffleBlockId are fetched,  take  relative sequence of 
> managedBuffers(which are ordered by segmentId) as a successResult for a 
> ShuffleBlockID.
> 8. In the shuffle serivice side, if the blockId of openBlocks is a 
> ShuffleBlockSegmentId,  response a segment managedBuffer of block , if the 
> blockId is a ShuffleBlockId response a whole managedBuffer of block as before.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to