Repository: spark
Updated Branches:
  refs/heads/branch-1.6 7a02c446f -> baf29854e


[SPARK-14290] [SPARK-13352] [CORE] [BACKPORT-1.6] avoid significant memory copy 
in Netty's tran…

## What changes were proposed in this pull request?
When netty transfer data that is not `FileRegion`, data will be in format of 
`ByteBuf`, If the data is large, there will occur significant performance issue 
because there is memory copy underlying in `sun.nio.ch.IOUtil.write`, the CPU 
is 100% used, and network is very low.

In this PR, if data size is large, we will split it into small chunks to call 
`WritableByteChannel.write()`, so that avoid wasting of memory copy. Because 
the data can't be written within a single write, and it will call `transferTo` 
multiple times.

## How was this patch tested?
Spark unit test and manual test.
Manual test:
`sc.parallelize(Array(1,2,3),3).mapPartitions(a=>Array(new Array[Double](1024 * 
1024 * 50)).iterator).reduce((a,b)=> a).length`

For more details, please refer to 
[SPARK-14290](https://issues.apache.org/jira/browse/SPARK-14290)

Author: Zhang, Liye <liye.zh...@intel.com>

Closes #12296 from liyezhang556520/apache-branch-1.6-spark-14290.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/baf29854
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/baf29854
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/baf29854

Branch: refs/heads/branch-1.6
Commit: baf29854eaa41976c75ecd2c472806c4a1c02c2a
Parents: 7a02c44
Author: Zhang, Liye <liye.zh...@intel.com>
Authored: Mon Apr 11 10:06:57 2016 -0700
Committer: Davies Liu <davies....@gmail.com>
Committed: Mon Apr 11 10:06:57 2016 -0700

----------------------------------------------------------------------
 .../network/protocol/MessageWithHeader.java     | 30 +++++++++++++++++++-
 1 file changed, 29 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/baf29854/network/common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
----------------------------------------------------------------------
diff --git 
a/network/common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
 
b/network/common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
index d686a95..44403ee 100644
--- 
a/network/common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
+++ 
b/network/common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
@@ -18,6 +18,7 @@
 package org.apache.spark.network.protocol;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 
 import com.google.common.base.Preconditions;
@@ -39,6 +40,14 @@ class MessageWithHeader extends AbstractReferenceCounted 
implements FileRegion {
   private final long bodyLength;
   private long totalBytesTransferred;
 
+  /**
+   * When the write buffer size is larger than this limit, I/O will be done in 
chunks of this size.
+   * The size should not be too large as it will waste underlying memory copy. 
e.g. If network
+   * available buffer is smaller than this limit, the data cannot be sent 
within one single write
+   * operation while it still will make memory copy with this size.
+   */
+  private static final int NIO_BUFFER_LIMIT = 256 * 1024;
+
   MessageWithHeader(ByteBuf header, Object body, long bodyLength) {
     Preconditions.checkArgument(body instanceof ByteBuf || body instanceof 
FileRegion,
       "Body must be a ByteBuf or a FileRegion.");
@@ -102,8 +111,27 @@ class MessageWithHeader extends AbstractReferenceCounted 
implements FileRegion {
   }
 
   private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws 
IOException {
-    int written = target.write(buf.nioBuffer());
+    ByteBuffer buffer = buf.nioBuffer();
+    int written = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
+      target.write(buffer) : writeNioBuffer(target, buffer);
     buf.skipBytes(written);
     return written;
   }
+
+  private int writeNioBuffer(
+      WritableByteChannel writeCh,
+      ByteBuffer buf) throws IOException {
+    int originalLimit = buf.limit();
+    int ret = 0;
+
+    try {
+      int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
+      buf.limit(buf.position() + ioSize);
+      ret = writeCh.write(buf);
+    } finally {
+      buf.limit(originalLimit);
+    }
+
+    return ret;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to