Repository: hbase
Updated Branches:
  refs/heads/master cc03f7ad5 -> 51d9bac42


HBASE-17048 Calcuate suitable ByteBuf size when allocating send buffer in
FanOutOneBlockAsyncDFSOutput (Ram)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/51d9bac4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/51d9bac4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/51d9bac4

Branch: refs/heads/master
Commit: 51d9bac42b30c2b1a81094970a1ce50f70de3192
Parents: cc03f7a
Author: Ramkrishna <ramkrishna.s.vasude...@intel.com>
Authored: Tue Nov 29 09:12:47 2016 +0530
Committer: Ramkrishna <ramkrishna.s.vasude...@intel.com>
Committed: Tue Nov 29 09:12:47 2016 +0530

----------------------------------------------------------------------
 .../asyncfs/FanOutOneBlockAsyncDFSOutput.java   | 32 ++++++++++++++++++--
 .../TestFanOutOneBlockAsyncDFSOutput.java       | 15 +++++++++
 2 files changed, 45 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/51d9bac4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index 02ffcd5..1ac1920 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -71,6 +71,8 @@ import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.util.DataChecksum;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * An asynchronous HDFS output stream implementation which fans out data to 
datanode and only
  * supports writing file with only one block.
@@ -164,6 +166,11 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
   private long nextPacketSeqno = 0L;
 
   private ByteBuf buf;
+  // buf's initial capacity - 4KB
+  private int capacity = 4 * 1024;
+
+  // LIMIT is 128MB
+  private final int LIMIT = 128 * 1024 * 1024;
 
   private enum State {
     STREAMING, CLOSING, BROKEN, CLOSED
@@ -307,7 +314,7 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
     this.summer = summer;
     this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % 
summer.getBytesPerChecksum());
     this.alloc = alloc;
-    this.buf = alloc.directBuffer();
+    this.buf = alloc.directBuffer(capacity);
     this.state = State.STREAMING;
     setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
   }
@@ -472,7 +479,7 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
       }
     });
     int trailingPartialChunkLen = dataLen % summer.getBytesPerChecksum();
-    ByteBuf newBuf = 
alloc.directBuffer().ensureWritable(trailingPartialChunkLen);
+    ByteBuf newBuf = 
alloc.directBuffer(guess(dataLen)).ensureWritable(trailingPartialChunkLen);
     if (trailingPartialChunkLen != 0) {
       buf.readerIndex(dataLen - trailingPartialChunkLen).readBytes(newBuf, 
trailingPartialChunkLen);
     }
@@ -543,4 +550,25 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
     datanodeList.forEach(ch -> ch.closeFuture().awaitUninterruptibly());
     completeFile(client, namenode, src, clientName, locatedBlock.getBlock(), 
fileId);
   }
+
+  @VisibleForTesting
+  int guess(int bytesWritten) {
+    // if the bytesWritten is greater than the current capacity
+    // always increase the capacity in powers of 2.
+    if (bytesWritten > this.capacity) {
+      // Ensure we don't cross the LIMIT
+      if ((this.capacity << 1) <= LIMIT) {
+        // increase the capacity in the range of power of 2
+        this.capacity = this.capacity << 1;
+      }
+    } else {
+      // if we see that the bytesWritten is lesser we could again decrease
+      // the capacity by dividing it by 2 if the bytesWritten is satisfied by
+      // that reduction
+      if ((this.capacity >> 1) >= bytesWritten) {
+        this.capacity = this.capacity >> 1;
+      }
+    }
+    return this.capacity;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/51d9bac4/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
index 7897472..f59133a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
@@ -128,6 +128,21 @@ public class TestFanOutOneBlockAsyncDFSOutput {
   }
 
   @Test
+  public void testMaxByteBufAllocated() throws Exception {
+    Path f = new Path("/" + name.getMethodName());
+    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
+    final FanOutOneBlockAsyncDFSOutput out = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
+      true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
+    out.guess(5 * 1024);
+    assertEquals(8 * 1024, out.guess(5 * 1024));
+    assertEquals(16 * 1024, out.guess(10 * 1024));
+    // it wont reduce directly to 4KB
+    assertEquals(8 * 1024, out.guess(4 * 1024));
+    // This time it will reduece
+    assertEquals(4 * 1024, out.guess(4 * 1024));
+  }
+
+  @Test
   public void testRecover() throws IOException, InterruptedException, 
ExecutionException {
     Path f = new Path("/" + name.getMethodName());
     EventLoop eventLoop = EVENT_LOOP_GROUP.next();

Reply via email to