Repository: spark
Updated Branches:
  refs/heads/master c5a0d1132 -> 3f4bda728


[SPARK-24578][CORE] Cap sub-region's size of returned nio buffer

## What changes were proposed in this pull request?
This PR tries to fix the performance regression introduced by SPARK-21517.

In our production job, we performed many parallel computations, with high 
possibility, some task could be scheduled to a host-2 where it needs to read 
the cache block data from host-1. Often, this big transfer makes the cluster 
suffer time out issue (it will retry 3 times, each with 120s timeout, and then 
do recompute to put the cache block into the local MemoryStore).

The root cause is that we don't do `consolidateIfNeeded` anymore as we are using
```
Unpooled.wrappedBuffer(chunks.length, getChunks(): _*)
```
in ChunkedByteBuffer. If we have many small chunks, it could cause the 
`buf.notBuffer(...)` have very bad performance in the case that we have to call 
`copyByteBuf(...)` many times.

## How was this patch tested?
Existing unit tests and also test in production

Author: Wenbo Zhao <wz...@twosigma.com>

Closes #21593 from WenboZhao/spark-24578.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3f4bda72
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3f4bda72
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3f4bda72

Branch: refs/heads/master
Commit: 3f4bda7289f1bfbbe8b9bc4b516007f569c44d2e
Parents: c5a0d11
Author: Wenbo Zhao <wz...@twosigma.com>
Authored: Wed Jun 20 14:26:04 2018 -0700
Committer: Shixiong Zhu <zsxw...@gmail.com>
Committed: Wed Jun 20 14:26:04 2018 -0700

----------------------------------------------------------------------
 .../network/protocol/MessageWithHeader.java     | 25 ++++----------------
 1 file changed, 5 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3f4bda72/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
----------------------------------------------------------------------
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
 
b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
index a533765..e7b66a6 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
@@ -137,30 +137,15 @@ class MessageWithHeader extends AbstractFileRegion {
   }
 
   private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws 
IOException {
-    ByteBuffer buffer = buf.nioBuffer();
-    int written = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
-      target.write(buffer) : writeNioBuffer(target, buffer);
+    // SPARK-24578: cap the sub-region's size of returned nio buffer to 
improve the performance
+    // for the case that the passed-in buffer has too many components.
+    int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT);
+    ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), length);
+    int written = target.write(buffer);
     buf.skipBytes(written);
     return written;
   }
 
-  private int writeNioBuffer(
-      WritableByteChannel writeCh,
-      ByteBuffer buf) throws IOException {
-    int originalLimit = buf.limit();
-    int ret = 0;
-
-    try {
-      int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
-      buf.limit(buf.position() + ioSize);
-      ret = writeCh.write(buf);
-    } finally {
-      buf.limit(originalLimit);
-    }
-
-    return ret;
-  }
-
   @Override
   public MessageWithHeader touch(Object o) {
     super.touch(o);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to