Repository: spark Updated Branches: refs/heads/master 300807c6e -> 16612638f
[SPARK-21517][CORE] Avoid copying memory when transfer chunks remotely ## What changes were proposed in this pull request? In our production cluster,oom happens when NettyBlockRpcServer receive OpenBlocks message.The reason we observed is below: When BlockManagerManagedBuffer call ChunkedByteBuffer#toNetty, it will use Unpooled.wrappedBuffer(ByteBuffer... buffers) which use default maxNumComponents=16 in low-level CompositeByteBuf.When our component's number is bigger than 16, it will execute consolidateIfNeeded int numComponents = this.components.size(); if(numComponents > this.maxNumComponents) { int capacity = ((CompositeByteBuf.Component)this.components.get(numComponents - 1)).endOffset; ByteBuf consolidated = this.allocBuffer(capacity); for(int c = 0; c < numComponents; ++c) { CompositeByteBuf.Component c1 = (CompositeByteBuf.Component)this.components.get(c); ByteBuf b = c1.buf; consolidated.writeBytes(b); c1.freeIfNecessary(); } CompositeByteBuf.Component var7 = new CompositeByteBuf.Component(consolidated); var7.endOffset = var7.length; this.components.clear(); this.components.add(var7); } in CompositeByteBuf which will consume some memory during buffer copy. We can use another api Unpooled. wrappedBuffer(int maxNumComponents, ByteBuffer... buffers) to avoid this comsuming. ## How was this patch tested? Test in production cluster. Author: zhoukang <zhouk...@xiaomi.com> Closes #18723 from caneGuy/zhoukang/fix-chunkbuffer. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16612638 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16612638 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16612638 Branch: refs/heads/master Commit: 16612638f0539f197eb7deb1be2ec53fed60d707 Parents: 300807c Author: zhoukang <zhouk...@xiaomi.com> Authored: Tue Jul 25 17:59:21 2017 -0700 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Tue Jul 25 17:59:21 2017 -0700 ---------------------------------------------------------------------- .../main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/16612638/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index 2f905c8..f48bfd5 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -66,7 +66,7 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { * Wrap this buffer to view it as a Netty ByteBuf. */ def toNetty: ByteBuf = { - Unpooled.wrappedBuffer(getChunks(): _*) + Unpooled.wrappedBuffer(chunks.length, getChunks(): _*) } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org