[ https://issues.apache.org/jira/browse/SPARK-24307?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Xiao Li updated SPARK-24307: ---------------------------- Labels: release-notes (was: releasenotes) > Support sending messages over 2GB from memory > --------------------------------------------- > > Key: SPARK-24307 > URL: https://issues.apache.org/jira/browse/SPARK-24307 > Project: Spark > Issue Type: Sub-task > Components: Block Manager, Spark Core > Affects Versions: 2.3.0 > Reporter: Imran Rashid > Assignee: Imran Rashid > Priority: Major > Labels: release-notes > Fix For: 2.4.0 > > > Spark's networking layer supports sending messages backed by a {{FileRegion}} > or a {{ByteBuf}}. Sending large FileRegion's works, as netty supports large > FileRegions. However, {{ByteBuf}} is limited to 2GB. This is particularly > a problem for sending large datasets that are already in memory, eg. cached > RDD blocks. > eg. if you try to replicate a block stored in memory that is over 2 GB, you > will see an exception like: > {noformat} > 18/05/16 12:40:57 ERROR client.TransportClient: Failed to send RPC > 7420542363232096629 to xyz.com/172.31.113.213:44358: > io.netty.handler.codec.EncoderException: java.lang.IndexOutOfBoundsException: > readerIndex: 0, writerIndex: -1294617291 (expected: 0 <= readerIndex <= > writerIndex <= capacity(-1294617291)) > io.netty.handler.codec.EncoderException: java.lang.IndexOutOfBoundsException: > readerIndex: 0, writerIndex: -1294617291 (expected: 0 <= readerIndex <= > writerIndex <= capacity(-1294617291)) > at > io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:106) > at > io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738) > at > io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730) > at > io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:816) > at > io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:723) > at > io.netty.handler.timeout.IdleStateHandler.write(IdleStateHandler.java:302) > at > io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738) > at > io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:730) > at > io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:38) > at > io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1081) > at > io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:1128) > at > io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1070) > at > io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:403) > at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:463) > at > io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) > at > io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.lang.IndexOutOfBoundsException: readerIndex: 0, writerIndex: > -1294617291 (expected: 0 <= readerIndex <= writerIndex <= > capacity(-1294617291)) > at io.netty.buffer.AbstractByteBuf.setIndex(AbstractByteBuf.java:129) > at > io.netty.buffer.CompositeByteBuf.setIndex(CompositeByteBuf.java:1688) > at io.netty.buffer.CompositeByteBuf.<init>(CompositeByteBuf.java:110) > at io.netty.buffer.Unpooled.wrappedBuffer(Unpooled.java:359) > at > org.apache.spark.util.io.ChunkedByteBuffer.toNetty(ChunkedByteBuffer.scala:87) > at > org.apache.spark.storage.ByteBufferBlockData.toNetty(BlockManager.scala:95) > at > org.apache.spark.storage.BlockManagerManagedBuffer.convertToNetty(BlockManagerManagedBuffer.scala:52) > at > org.apache.spark.network.protocol.MessageEncoder.encode(MessageEncoder.java:58) > at > org.apache.spark.network.protocol.MessageEncoder.encode(MessageEncoder.java:33) > at > io.netty.handler.codec.MessageToMessageEncoder.write(MessageToMessageEncoder.java:88) > ... 17 more > {noformat} > A simple solution to this is to create a "FileRegion" which is backed by a > {{ChunkedByteBuffer}} (spark's existing datastructure to support blocks > 2GB > in memory). > A drawback to this approach is that blocks that are cached in memory as > deserialized values would need to have the *entire* block serialized into > memory before it can be pushed. However, that would involve a larger > change to the block manager as well, and is not strictly necessary, so can be > handled separately as a performance improvement. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org