[jira] [Updated] (SPARK-27876) Split large shuffle partition to multi-segments to enable transfer oversize shuffle partition block.
[ https://issues.apache.org/jira/browse/SPARK-27876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] feiwang updated SPARK-27876: Affects Version/s: (was: 3.1.0) (was: 2.4.3) 2.3.2 > Split large shuffle partition to multi-segments to enable transfer oversize > shuffle partition block. > > > Key: SPARK-27876 > URL: https://issues.apache.org/jira/browse/SPARK-27876 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 2.3.2 >Reporter: feiwang >Priority: Major > > There is a limit for shuffle read. > If a shuffle partition block's size is large than Integer.MaxValue(2GB) and > this block is fetched from remote, an Exception will be thrown. > {code:java} > 2019-05-24 06:46:30,333 [9935] - WARN > [shuffle-client-6-2:TransportChannelHandler@78] - Exception in connection > from hadoop3747.jd.163.org/10.196.76.172:7337 > java.lang.IllegalArgumentException: Too large frame: 2991947178 > at > org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119) > at > org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:133) > at > org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:81) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > {code} > Then this task would throw a fetchFailedException. > This task will retry and it would execute successfully only when this task > was reScheduled to a executor whose host is same to this oversize shuffle > partition block. > However, if there are more than one oversize(>2GB) shuffle partitions block, > this task would never execute successfully and it may cause the failure of > application. > In this PR, I propose a new method to fetch shuffle block, it would fetch > multi times when the relative shuffle partition block is oversize. > The simple brief introduction: > 1. Set a shuffle fetch threshold(SHUFFLE_FETCH_THRESHOLD) to Int.MaxValue(2GB) > 2. Set a parameter spark.shuffle.fetch.split to control whether enable fetch > large partition multi times > 3. When creating mapStatus, caucluate the segemens of shuffle block > (Math.ceil(size /SHUFFLE_FETCH_THRESHOLD )), and only record the segment > number which is large than 1. > 4. Define a new BlockId type, ShuffleBlockSegmentId, used to identifiy the > fetch method. > 5. When spark.shuffle.fetch.split is enabled, send ShuffleBlockSegmentId > message to shuffleService instead of ShuffleBlockId message. > 6. For a ShuffleBlockId, use a sequence of ManagedBuffers to present its > block instead of a ManagedBuffer. > 7. In ShuffleBlockFetcherIterator, create a PriorityBlockQueue for a > ShuffleBlockId to store the fetched SegmentManagedBuffer, when all segments > of a ShuffleBlockId are fetched, take relative sequence of > managedBuffers(which are ordered by segmentId) as a successResult for a > ShuffleBlockID. > 8. In the shuffle serivice side, if the blockId of openBlocks is a > ShuffleBlockSegmentId, response a segment managedBuffer of block , if the > blockId is a ShuffleBlockId response a whole managedBuffer of block as before. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-27876) Split large shuffle partition to multi-segments to enable transfer oversize shuffle partition block.
[ https://issues.apache.org/jira/browse/SPARK-27876?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-27876: -- Summary: Split large shuffle partition to multi-segments to enable transfer oversize shuffle partition block. (was: [CORE][SHUFFLE] Split large shuffle partition to multi-segments to enable transfer oversize shuffle partition block.) > Split large shuffle partition to multi-segments to enable transfer oversize > shuffle partition block. > > > Key: SPARK-27876 > URL: https://issues.apache.org/jira/browse/SPARK-27876 > Project: Spark > Issue Type: Improvement > Components: Shuffle, Spark Core >Affects Versions: 2.4.3, 3.1.0 >Reporter: feiwang >Priority: Major > > There is a limit for shuffle read. > If a shuffle partition block's size is large than Integer.MaxValue(2GB) and > this block is fetched from remote, an Exception will be thrown. > {code:java} > 2019-05-24 06:46:30,333 [9935] - WARN > [shuffle-client-6-2:TransportChannelHandler@78] - Exception in connection > from hadoop3747.jd.163.org/10.196.76.172:7337 > java.lang.IllegalArgumentException: Too large frame: 2991947178 > at > org.spark_project.guava.base.Preconditions.checkArgument(Preconditions.java:119) > at > org.apache.spark.network.util.TransportFrameDecoder.decodeNext(TransportFrameDecoder.java:133) > at > org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:81) > at > io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) > {code} > Then this task would throw a fetchFailedException. > This task will retry and it would execute successfully only when this task > was reScheduled to a executor whose host is same to this oversize shuffle > partition block. > However, if there are more than one oversize(>2GB) shuffle partitions block, > this task would never execute successfully and it may cause the failure of > application. > In this PR, I propose a new method to fetch shuffle block, it would fetch > multi times when the relative shuffle partition block is oversize. > The simple brief introduction: > 1. Set a shuffle fetch threshold(SHUFFLE_FETCH_THRESHOLD) to Int.MaxValue(2GB) > 2. Set a parameter spark.shuffle.fetch.split to control whether enable fetch > large partition multi times > 3. When creating mapStatus, caucluate the segemens of shuffle block > (Math.ceil(size /SHUFFLE_FETCH_THRESHOLD )), and only record the segment > number which is large than 1. > 4. Define a new BlockId type, ShuffleBlockSegmentId, used to identifiy the > fetch method. > 5. When spark.shuffle.fetch.split is enabled, send ShuffleBlockSegmentId > message to shuffleService instead of ShuffleBlockId message. > 6. For a ShuffleBlockId, use a sequence of ManagedBuffers to present its > block instead of a ManagedBuffer. > 7. In ShuffleBlockFetcherIterator, create a PriorityBlockQueue for a > ShuffleBlockId to store the fetched SegmentManagedBuffer, when all segments > of a ShuffleBlockId are fetched, take relative sequence of > managedBuffers(which are ordered by segmentId) as a successResult for a > ShuffleBlockID. > 8. In the shuffle serivice side, if the blockId of openBlocks is a > ShuffleBlockSegmentId, response a segment managedBuffer of block , if the > blockId is a ShuffleBlockId response a whole managedBuffer of block as before. -- This message was sent by Atlassian JIRA (v7.6.3#76005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org