Repository: hbase
Updated Branches:
  refs/heads/master 9875c699a -> caa1a8494


HBASE-15709 Handle large edits for asynchronous WAL


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/caa1a849
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/caa1a849
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/caa1a849

Branch: refs/heads/master
Commit: caa1a8494aef3f87a76d5d4942daa953219df4eb
Parents: 9875c69
Author: zhangduo <zhang...@apache.org>
Authored: Tue Oct 25 11:25:10 2016 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Tue Oct 25 13:46:01 2016 +0800

----------------------------------------------------------------------
 .../asyncfs/FanOutOneBlockAsyncDFSOutput.java   | 111 ++++++++++++-------
 .../TestFanOutOneBlockAsyncDFSOutput.java       |  46 ++++++--
 .../hbase/regionserver/TestMobStoreScanner.java |   3 -
 3 files changed, 112 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/caa1a849/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index 9aab924..e130381 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -42,6 +42,7 @@ import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.FutureListener;
 import io.netty.util.concurrent.Promise;
+import io.netty.util.concurrent.PromiseCombiner;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -103,6 +104,10 @@ import org.apache.hadoop.util.DataChecksum;
 @InterfaceAudience.Private
 public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
 
+  // The MAX_PACKET_SIZE is 16MB but it include the header size and checksum 
size. So here we set a
+  // smaller limit for data size.
+  private static final int MAX_DATA_LEN = 12 * 1024 * 1024;
+
   private final Configuration conf;
 
   private final FSUtils fsUtils;
@@ -129,6 +134,8 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
 
   private final DataChecksum summer;
 
+  private final int maxDataLen;
+
   private final ByteBufAllocator alloc;
 
   private static final class Callback {
@@ -271,8 +278,7 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
     }
 
     @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, final Throwable 
cause)
-        throws Exception {
+    public void exceptionCaught(ChannelHandlerContext ctx, final Throwable 
cause) throws Exception {
       failed(ctx.channel(), new Supplier<Throwable>() {
 
         @Override
@@ -336,6 +342,7 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
     this.eventLoop = eventLoop;
     this.datanodeList = datanodeList;
     this.summer = summer;
+    this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % 
summer.getBytesPerChecksum());
     this.alloc = alloc;
     this.buf = alloc.directBuffer();
     this.state = State.STREAMING;
@@ -393,45 +400,15 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
     return locatedBlock.getLocations();
   }
 
-  private <A> void flush0(final A attachment, final CompletionHandler<Long, ? 
super A> handler,
+  private Promise<Void> flushBuffer(ByteBuf dataBuf, long 
nextPacketOffsetInBlock,
       boolean syncBlock) {
-    if (state != State.STREAMING) {
-      handler.failed(new IOException("stream already broken"), attachment);
-      return;
-    }
-    int dataLen = buf.readableBytes();
-    final long ackedLength = nextPacketOffsetInBlock + dataLen;
-    if (ackedLength == locatedBlock.getBlock().getNumBytes()) {
-      // no new data, just return
-      handler.completed(locatedBlock.getBlock().getNumBytes(), attachment);
-      return;
-    }
-    Promise<Void> promise = eventLoop.newPromise();
-    promise.addListener(new FutureListener<Void>() {
-
-      @Override
-      public void operationComplete(Future<Void> future) throws Exception {
-        if (future.isSuccess()) {
-          locatedBlock.getBlock().setNumBytes(ackedLength);
-          handler.completed(ackedLength, attachment);
-        } else {
-          handler.failed(future.cause(), attachment);
-        }
-      }
-    });
-    Callback c = waitingAckQueue.peekLast();
-    if (c != null && ackedLength == c.ackedLength) {
-      // just append it to the tail of waiting ack queue,, do not issue new 
hflush request.
-      waitingAckQueue
-          .addLast(new Callback(promise, ackedLength, Collections.<Channel> 
emptyList()));
-      return;
-    }
+    int dataLen = dataBuf.readableBytes();
     int chunkLen = summer.getBytesPerChecksum();
     int trailingPartialChunkLen = dataLen % chunkLen;
     int numChecks = dataLen / chunkLen + (trailingPartialChunkLen != 0 ? 1 : 
0);
     int checksumLen = numChecks * summer.getChecksumSize();
     ByteBuf checksumBuf = alloc.directBuffer(checksumLen);
-    summer.calculateChunkedSums(buf.nioBuffer(), checksumBuf.nioBuffer(0, 
checksumLen));
+    summer.calculateChunkedSums(dataBuf.nioBuffer(), checksumBuf.nioBuffer(0, 
checksumLen));
     checksumBuf.writerIndex(checksumLen);
     PacketHeader header = new PacketHeader(4 + checksumLen + dataLen, 
nextPacketOffsetInBlock,
         nextPacketSeqno, false, dataLen, syncBlock);
@@ -440,14 +417,75 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
     header.putInBuffer(headerBuf.nioBuffer(0, headerLen));
     headerBuf.writerIndex(headerLen);
 
+    long ackedLength = nextPacketOffsetInBlock + dataLen;
+    Promise<Void> promise = eventLoop.<Void> newPromise().addListener(future 
-> {
+      if (future.isSuccess()) {
+        locatedBlock.getBlock().setNumBytes(ackedLength);
+      }
+    });
     waitingAckQueue.addLast(new Callback(promise, ackedLength, datanodeList));
     for (Channel ch : datanodeList) {
       ch.write(headerBuf.duplicate().retain());
       ch.write(checksumBuf.duplicate().retain());
-      ch.writeAndFlush(buf.duplicate().retain());
+      ch.writeAndFlush(dataBuf.duplicate().retain());
     }
     checksumBuf.release();
     headerBuf.release();
+    dataBuf.release();
+    nextPacketSeqno++;
+    return promise;
+  }
+
+  private <A> void flush0(final A attachment, final CompletionHandler<Long, ? 
super A> handler,
+      boolean syncBlock) {
+    if (state != State.STREAMING) {
+      handler.failed(new IOException("stream already broken"), attachment);
+      return;
+    }
+    int dataLen = buf.readableBytes();
+    final long lengthAfterFlush = nextPacketOffsetInBlock + dataLen;
+    if (lengthAfterFlush == locatedBlock.getBlock().getNumBytes()) {
+      // no new data, just return
+      handler.completed(locatedBlock.getBlock().getNumBytes(), attachment);
+      return;
+    }
+    Callback c = waitingAckQueue.peekLast();
+    if (c != null && lengthAfterFlush == c.ackedLength) {
+      // just append it to the tail of waiting ack queue,, do not issue new 
hflush request.
+      waitingAckQueue.addLast(new Callback(eventLoop.<Void> 
newPromise().addListener(future -> {
+        if (future.isSuccess()) {
+          handler.completed(lengthAfterFlush, attachment);
+        } else {
+          handler.failed(future.cause(), attachment);
+        }
+      }), lengthAfterFlush, Collections.<Channel> emptyList()));
+      return;
+    }
+    Promise<Void> promise;
+    if (dataLen > maxDataLen) {
+      // We need to write out the data by multiple packets as the max packet 
allowed is 16M.
+      PromiseCombiner combiner = new PromiseCombiner();
+      long nextSubPacketOffsetInBlock = nextPacketOffsetInBlock;
+      for (int remaining = dataLen; remaining > 0;) {
+        int toWriteDataLen = Math.min(remaining, maxDataLen);
+        combiner.add(flushBuffer(buf.readRetainedSlice(toWriteDataLen), 
nextSubPacketOffsetInBlock,
+          syncBlock));
+        nextSubPacketOffsetInBlock += toWriteDataLen;
+        remaining -= toWriteDataLen;
+      }
+      promise = eventLoop.newPromise();
+      combiner.finish(promise);
+    } else {
+      promise = flushBuffer(buf.retain(), nextPacketOffsetInBlock, syncBlock);
+    }
+    promise.addListener(future -> {
+      if (future.isSuccess()) {
+        handler.completed(lengthAfterFlush, attachment);
+      } else {
+        handler.failed(future.cause(), attachment);
+      }
+    });
+    int trailingPartialChunkLen = dataLen % summer.getBytesPerChecksum();
     ByteBuf newBuf = 
alloc.directBuffer().ensureWritable(trailingPartialChunkLen);
     if (trailingPartialChunkLen != 0) {
       buf.readerIndex(dataLen - trailingPartialChunkLen).readBytes(newBuf, 
trailingPartialChunkLen);
@@ -455,7 +493,6 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
     buf.release();
     this.buf = newBuf;
     nextPacketOffsetInBlock += dataLen - trailingPartialChunkLen;
-    nextPacketSeqno++;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/caa1a849/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
index 2be3b28..a6d3177 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestFanOutOneBlockAsyncDFSOutput.java
@@ -91,9 +91,9 @@ public class TestFanOutOneBlockAsyncDFSOutput {
     // will fail.
     for (;;) {
       try {
-        FanOutOneBlockAsyncDFSOutput out = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS,
-          new Path("/ensureDatanodeAlive"), true, true, (short) 3, 
FS.getDefaultBlockSize(),
-          EVENT_LOOP_GROUP.next());
+        FanOutOneBlockAsyncDFSOutput out =
+            FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new 
Path("/ensureDatanodeAlive"),
+              true, true, (short) 3, FS.getDefaultBlockSize(), 
EVENT_LOOP_GROUP.next());
         out.close();
         break;
       } catch (IOException e) {
@@ -104,10 +104,11 @@ public class TestFanOutOneBlockAsyncDFSOutput {
 
   static void writeAndVerify(EventLoop eventLoop, DistributedFileSystem dfs, 
Path f,
       final FanOutOneBlockAsyncDFSOutput out)
-          throws IOException, InterruptedException, ExecutionException {
+      throws IOException, InterruptedException, ExecutionException {
     final byte[] b = new byte[10];
     ThreadLocalRandom.current().nextBytes(b);
-    final FanOutOneBlockAsyncDFSOutputFlushHandler handler = new 
FanOutOneBlockAsyncDFSOutputFlushHandler();
+    final FanOutOneBlockAsyncDFSOutputFlushHandler handler =
+        new FanOutOneBlockAsyncDFSOutputFlushHandler();
     eventLoop.execute(new Runnable() {
 
       @Override
@@ -143,7 +144,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
       true, false, (short) 3, FS.getDefaultBlockSize(), eventLoop);
     final byte[] b = new byte[10];
     ThreadLocalRandom.current().nextBytes(b);
-    final FanOutOneBlockAsyncDFSOutputFlushHandler handler = new 
FanOutOneBlockAsyncDFSOutputFlushHandler();
+    final FanOutOneBlockAsyncDFSOutputFlushHandler handler =
+        new FanOutOneBlockAsyncDFSOutputFlushHandler();
     eventLoop.execute(new Runnable() {
 
       @Override
@@ -218,8 +220,8 @@ public class TestFanOutOneBlockAsyncDFSOutput {
       InvocationTargetException, InterruptedException, NoSuchFieldException {
     Field xceiverServerDaemonField = 
DataNode.class.getDeclaredField("dataXceiverServer");
     xceiverServerDaemonField.setAccessible(true);
-    Class<?> xceiverServerClass = Class
-        .forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
+    Class<?> xceiverServerClass =
+        
Class.forName("org.apache.hadoop.hdfs.server.datanode.DataXceiverServer");
     Method numPeersMethod = 
xceiverServerClass.getDeclaredMethod("getNumPeers");
     numPeersMethod.setAccessible(true);
     // make one datanode broken
@@ -243,4 +245,32 @@ public class TestFanOutOneBlockAsyncDFSOutput {
       ensureAllDatanodeAlive();
     }
   }
+
+  @Test
+  public void testWriteLargeChunk() throws IOException, InterruptedException, 
ExecutionException {
+    Path f = new Path("/" + name.getMethodName());
+    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
+    final FanOutOneBlockAsyncDFSOutput out = 
FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, f,
+      true, false, (short) 3, 1024 * 1024 * 1024, eventLoop);
+    byte[] b = new byte[50 * 1024 * 1024];
+    ThreadLocalRandom.current().nextBytes(b);
+    FanOutOneBlockAsyncDFSOutputFlushHandler handler =
+        new FanOutOneBlockAsyncDFSOutputFlushHandler();
+    eventLoop.execute(new Runnable() {
+
+      @Override
+      public void run() {
+        out.write(b);
+        out.flush(null, handler, false);
+      }
+    });
+    assertEquals(b.length, handler.get());
+    out.close();
+    assertEquals(b.length, FS.getFileStatus(f).getLen());
+    byte[] actual = new byte[b.length];
+    try (FSDataInputStream in = FS.open(f)) {
+      in.readFully(actual);
+    }
+    assertArrayEquals(b, actual);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/caa1a849/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
index 6a4aceb..c93c919 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMobStoreScanner.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
-import org.apache.hadoop.hbase.wal.WALFactory;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -79,8 +78,6 @@ public class TestMobStoreScanner {
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     TEST_UTIL.getConfiguration().setInt("hbase.client.keyvalue.maxsize", 100 * 
1024 * 1024);
-    // TODO: AsyncFSWAL can not handle large edits right now, remove this 
after we fix the issue.
-    TEST_UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, "filesystem");
     TEST_UTIL.startMiniCluster(1);
   }
 

Reply via email to