spark git commit: [SPARK-24578][CORE] Cap sub-region's size of returned nio buffer

2018-06-20 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 d687d97b1 -> 8928de3cd


[SPARK-24578][CORE] Cap sub-region's size of returned nio buffer

## What changes were proposed in this pull request?
This PR tries to fix the performance regression introduced by SPARK-21517.

In our production job, we performed many parallel computations, with high 
possibility, some task could be scheduled to a host-2 where it needs to read 
the cache block data from host-1. Often, this big transfer makes the cluster 
suffer time out issue (it will retry 3 times, each with 120s timeout, and then 
do recompute to put the cache block into the local MemoryStore).

The root cause is that we don't do `consolidateIfNeeded` anymore as we are using
```
Unpooled.wrappedBuffer(chunks.length, getChunks(): _*)
```
in ChunkedByteBuffer. If we have many small chunks, it could cause the 
`buf.notBuffer(...)` have very bad performance in the case that we have to call 
`copyByteBuf(...)` many times.

## How was this patch tested?
Existing unit tests and also test in production

Author: Wenbo Zhao 

Closes #21593 from WenboZhao/spark-24578.

(cherry picked from commit 3f4bda7289f1bfbbe8b9bc4b516007f569c44d2e)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8928de3c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8928de3c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8928de3c

Branch: refs/heads/branch-2.3
Commit: 8928de3cd448922d43f9ae80cea7138ecbea0d24
Parents: d687d97
Author: Wenbo Zhao 
Authored: Wed Jun 20 14:26:04 2018 -0700
Committer: Shixiong Zhu 
Committed: Wed Jun 20 14:26:32 2018 -0700

--
 .../network/protocol/MessageWithHeader.java | 25 
 1 file changed, 5 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8928de3c/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
--
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
 
b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
index a533765..e7b66a6 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
@@ -137,30 +137,15 @@ class MessageWithHeader extends AbstractFileRegion {
   }
 
   private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws 
IOException {
-ByteBuffer buffer = buf.nioBuffer();
-int written = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
-  target.write(buffer) : writeNioBuffer(target, buffer);
+// SPARK-24578: cap the sub-region's size of returned nio buffer to 
improve the performance
+// for the case that the passed-in buffer has too many components.
+int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT);
+ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), length);
+int written = target.write(buffer);
 buf.skipBytes(written);
 return written;
   }
 
-  private int writeNioBuffer(
-  WritableByteChannel writeCh,
-  ByteBuffer buf) throws IOException {
-int originalLimit = buf.limit();
-int ret = 0;
-
-try {
-  int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
-  buf.limit(buf.position() + ioSize);
-  ret = writeCh.write(buf);
-} finally {
-  buf.limit(originalLimit);
-}
-
-return ret;
-  }
-
   @Override
   public MessageWithHeader touch(Object o) {
 super.touch(o);


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-24578][CORE] Cap sub-region's size of returned nio buffer

2018-06-20 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master c5a0d1132 -> 3f4bda728


[SPARK-24578][CORE] Cap sub-region's size of returned nio buffer

## What changes were proposed in this pull request?
This PR tries to fix the performance regression introduced by SPARK-21517.

In our production job, we performed many parallel computations, with high 
possibility, some task could be scheduled to a host-2 where it needs to read 
the cache block data from host-1. Often, this big transfer makes the cluster 
suffer time out issue (it will retry 3 times, each with 120s timeout, and then 
do recompute to put the cache block into the local MemoryStore).

The root cause is that we don't do `consolidateIfNeeded` anymore as we are using
```
Unpooled.wrappedBuffer(chunks.length, getChunks(): _*)
```
in ChunkedByteBuffer. If we have many small chunks, it could cause the 
`buf.notBuffer(...)` have very bad performance in the case that we have to call 
`copyByteBuf(...)` many times.

## How was this patch tested?
Existing unit tests and also test in production

Author: Wenbo Zhao 

Closes #21593 from WenboZhao/spark-24578.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3f4bda72
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3f4bda72
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3f4bda72

Branch: refs/heads/master
Commit: 3f4bda7289f1bfbbe8b9bc4b516007f569c44d2e
Parents: c5a0d11
Author: Wenbo Zhao 
Authored: Wed Jun 20 14:26:04 2018 -0700
Committer: Shixiong Zhu 
Committed: Wed Jun 20 14:26:04 2018 -0700

--
 .../network/protocol/MessageWithHeader.java | 25 
 1 file changed, 5 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3f4bda72/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
--
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
 
b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
index a533765..e7b66a6 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/protocol/MessageWithHeader.java
@@ -137,30 +137,15 @@ class MessageWithHeader extends AbstractFileRegion {
   }
 
   private int copyByteBuf(ByteBuf buf, WritableByteChannel target) throws 
IOException {
-ByteBuffer buffer = buf.nioBuffer();
-int written = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
-  target.write(buffer) : writeNioBuffer(target, buffer);
+// SPARK-24578: cap the sub-region's size of returned nio buffer to 
improve the performance
+// for the case that the passed-in buffer has too many components.
+int length = Math.min(buf.readableBytes(), NIO_BUFFER_LIMIT);
+ByteBuffer buffer = buf.nioBuffer(buf.readerIndex(), length);
+int written = target.write(buffer);
 buf.skipBytes(written);
 return written;
   }
 
-  private int writeNioBuffer(
-  WritableByteChannel writeCh,
-  ByteBuffer buf) throws IOException {
-int originalLimit = buf.limit();
-int ret = 0;
-
-try {
-  int ioSize = Math.min(buf.remaining(), NIO_BUFFER_LIMIT);
-  buf.limit(buf.position() + ioSize);
-  ret = writeCh.write(buf);
-} finally {
-  buf.limit(originalLimit);
-}
-
-return ret;
-  }
-
   @Override
   public MessageWithHeader touch(Object o) {
 super.touch(o);


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org