Repository: hbase Updated Branches: refs/heads/master cc03f7ad5 -> 51d9bac42
HBASE-17048 Calcuate suitable ByteBuf size when allocating send buffer in FanOutOneBlockAsyncDFSOutput (Ram) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/51d9bac4 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/51d9bac4 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/51d9bac4 Branch: refs/heads/master Commit: 51d9bac42b30c2b1a81094970a1ce50f70de3192 Parents: cc03f7a Author: Ramkrishna <ramkrishna.s.vasude...@intel.com> Authored: Tue Nov 29 09:12:47 2016 +0530 Committer: Ramkrishna <ramkrishna.s.vasude...@intel.com> Committed: Tue Nov 29 09:12:47 2016 +0530 ---------------------------------------------------------------------- .../asyncfs/FanOutOneBlockAsyncDFSOutput.java | 32 ++++++++++++++++++-- .../TestFanOutOneBlockAsyncDFSOutput.java | 15 +++++++++ 2 files changed, 45 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/51d9bac4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java index 02ffcd5..1ac1920 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java @@ -71,6 +71,8 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.util.DataChecksum; +import com.google.common.annotations.VisibleForTesting; + /** * An asynchronous HDFS output stream implementation which fans out data to datanode and only * supports writing file with only one block. @@ -164,6 +166,11 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { private long nextPacketSeqno = 0L; private ByteBuf buf; + // buf's initial capacity - 4KB + private int capacity = 4 * 1024; + + // LIMIT is 128MB + private final int LIMIT = 128 * 1024 * 1024; private enum State { STREAMING, CLOSING, BROKEN, CLOSED @@ -307,7 +314,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { this.summer = summer; this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % summer.getBytesPerChecksum()); this.alloc = alloc; - this.buf = alloc.directBuffer(); + this.buf = alloc.directBuffer(capacity); this.state = State.STREAMING; setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT)); } @@ -472,7 +479,7 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { } }); int trailingPartialChunkLen = dataLen % summer.getBytesPerChecksum(); - ByteBuf newBuf = alloc.directBuffer().ensureWritable(trailingPartialChunkLen); + ByteBuf newBuf = alloc.directBuffer(guess(dataLen)).ensureWritable(trailingPartialChunkLen); if (trailingPartialChunkLen != 0) { buf.readerIndex(dataLen - trailingPartialChunkLen).readBytes(newBuf, trailingPartialChunkLen); } @@ -543,4 +550,25 @@ public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput { datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly()); completeFile(client, namenode, src, clientName, locatedBlock.getBlock(), fileId); } + + @VisibleForTesting + int guess(int bytesWritten) { + // if the bytesWritten is greater than the current capacity + // always increase the capacity in powers of 2. + if (bytesWritten > this.capacity) { + // Ensure we don't cross the LIMIT + if ((this.capacity << 1) <= LIMIT) { + // increase the capacity in the range of power of 2 + this.capacity = this.capacity << 1; + } + } else { + // if we see that the bytesWritten is lesser we could again decrease + // the capacity by dividing it by 2 if the bytesWritten is satisfied by + // that reduction + if ((this.capacity >> 1) >= bytesWritten) { + this.capacity = this.capacity >> 1; + } + } + return this.capacity; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/51d9bac4/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java index 7897472..f59133a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java @@ -128,6 +128,21 @@ public class TestFanOutOneBlockAsyncDFSOutput { } @Test + public void testMaxByteBufAllocated() throws Exception { + Path f = new Path("/" + name.getMethodName()); + EventLoop eventLoop = EVENT_LOOP_GROUP.next(); + final FanOutOneBlockAsyncDFSOutput out = FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f, + true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop); + out.guess(5 * 1024); + assertEquals(8 * 1024, out.guess(5 * 1024)); + assertEquals(16 * 1024, out.guess(10 * 1024)); + // it wont reduce directly to 4KB + assertEquals(8 * 1024, out.guess(4 * 1024)); + // This time it will reduece + assertEquals(4 * 1024, out.guess(4 * 1024)); + } + + @Test public void testRecover() throws IOException, InterruptedException, ExecutionException { Path f = new Path("/" + name.getMethodName()); EventLoop eventLoop = EVENT_LOOP_GROUP.next();