Repository: hbase
Updated Branches:
  refs/heads/branch-2 2bda22a64 -> df3668818


HBASE-19344 improve asyncWAL by using Independent thread for netty #IO in 
FanOutOneBlockAsyncDFSOutput


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/df366881
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/df366881
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/df366881

Branch: refs/heads/branch-2
Commit: df3668818de6b5d78f7e22911186909ad6aaf113
Parents: 2bda22a
Author: zhangduo <zhang...@apache.org>
Authored: Thu Nov 30 22:02:10 2017 +0800
Committer: zhangduo <zhang...@apache.org>
Committed: Fri Dec 1 11:19:09 2017 +0800

----------------------------------------------------------------------
 .../hbase/regionserver/wal/AbstractFSWAL.java   |   6 +-
 .../hbase/regionserver/wal/AsyncFSWAL.java      | 252 ++++++++++---------
 .../wal/AsyncProtobufLogWriter.java             |  26 +-
 .../hbase/regionserver/wal/FSWALEntry.java      |  14 --
 .../hbase/regionserver/wal/RingBufferTruck.java |   6 +-
 .../wal/SecureAsyncProtobufLogWriter.java       |  11 +-
 .../hadoop/hbase/wal/AsyncFSWALProvider.java    |  17 +-
 .../hbase/regionserver/wal/TestAsyncFSWAL.java  |   4 +-
 .../regionserver/wal/TestAsyncWALReplay.java    |   2 +-
 9 files changed, 168 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/df366881/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 64f44cd..534315e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -969,11 +969,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> 
implements WAL {
     try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
       FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore);
       entry.stampRegionSequenceId(we);
-      if (scope != null) {
-        ringBuffer.get(txid).load(entry, scope.getSpan());
-      } else {
-        ringBuffer.get(txid).load(entry, null);
-      }
+      ringBuffer.get(txid).load(entry);
     } finally {
       ringBuffer.publish(txid);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/df366881/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index 9aad2bc..18007aa 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -19,6 +19,10 @@ package org.apache.hadoop.hbase.regionserver.wal;
 
 import static 
org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate;
 
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.Sequence;
+import com.lmax.disruptor.Sequencer;
+
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.lang.reflect.Field;
@@ -32,6 +36,9 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -44,28 +51,25 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.trace.TraceUtil;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
 import 
org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.NameNodeException;
+import org.apache.hadoop.hbase.trace.TraceUtil;
 import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
-import 
org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
-import 
org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.SingleThreadEventExecutor;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.ipc.RemoteException;
-import org.apache.htrace.core.Span;
 import org.apache.htrace.core.TraceScope;
+import org.apache.yetus.audience.InterfaceAudience;
 
-import com.lmax.disruptor.RingBuffer;
-import com.lmax.disruptor.Sequence;
-import com.lmax.disruptor.Sequencer;
+import 
org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
+import 
org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.SingleThreadEventExecutor;
 
 /**
  * An asynchronous implementation of FSWAL.
@@ -81,7 +85,7 @@ import com.lmax.disruptor.Sequencer;
  * </li>
  * </ol>
  * </li>
- * <li>In the consumer task(in the EventLoop thread)
+ * <li>In the consumer task(executed in a single threaded thread pool)
  * <ol>
  * <li>Poll the entry from {@link #waitingConsumePayloads} and insert it into
  * {@link #toWriteAppends}</li>
@@ -117,14 +121,12 @@ import com.lmax.disruptor.Sequencer;
  * signal the {@link #readyForRollingCond}.</li>
  * <li>Back to the log roller thread, now we can confirm that there are no 
out-going entries, i.e.,
  * we reach a safe point. So it is safe to replace old writer with new writer 
now.</li>
- * <li>Set {@link #writerBroken} and {@link #waitingRoll} to false, cancel log 
roller exit checker
- * if any(see the comments in the {@link #syncFailed(Throwable)} method to see 
why we need a checker
- * here).</li>
+ * <li>Set {@link #writerBroken} and {@link #waitingRoll} to false.</li>
  * <li>Schedule the consumer task.</li>
  * <li>Schedule a background task to close the old writer.</li>
  * </ol>
  * For a broken writer roll request, the only difference is that we can bypass 
the wait for safe
- * point stage. See the comments in the {@link #syncFailed(Throwable)} method 
for more details.
+ * point stage.
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
 public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
@@ -142,7 +144,13 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> 
{
   public static final String ASYNC_WAL_CREATE_MAX_RETRIES = 
"hbase.wal.async.create.retries";
   public static final int DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES = 10;
 
-  private final EventLoop eventLoop;
+  public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP =
+    "hbase.wal.async.use-shared-event-loop";
+  public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = true;
+
+  private final EventLoopGroup eventLoopGroup;
+
+  private final ExecutorService consumeExecutor;
 
   private final Class<? extends Channel> channelClass;
 
@@ -153,8 +161,18 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> 
{
   // check if there is already a consumer task in the event loop's task queue
   private final Supplier<Boolean> hasConsumerTask;
 
-  // new writer is created and we are waiting for old writer to be closed.
-  private volatile boolean waitingRoll;
+  private static final int MAX_EPOCH = 0x3FFFFFFF;
+  // the lowest bit is waitingRoll, which means new writer is created and we 
are waiting for old
+  // writer to be closed.
+  // the second lowest bit is writerBorken which means the current writer is 
broken and rollWriter
+  // is needed.
+  // all other bits are the epoch number of the current writer, this is used 
to detect whether the
+  // writer is still the one when you issue the sync.
+  // notice that, modification to this field is only allowed under the 
protection of consumeLock.
+  private volatile int epochAndState;
+
+  // used to guard the log roll request when we exceed the log roll size.
+  private boolean rollRequested;
 
   private boolean readyForRolling;
 
@@ -166,9 +184,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
 
   private final AtomicBoolean consumerScheduled = new AtomicBoolean(false);
 
-  // writer is broken and rollWriter is needed.
-  private volatile boolean writerBroken;
-
   private final long batchSize;
 
   private final int createMaxRetries;
@@ -194,32 +209,41 @@ public class AsyncFSWAL extends 
AbstractFSWAL<AsyncWriter> {
 
   public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String 
archiveDir,
       Configuration conf, List<WALActionsListener> listeners, boolean 
failIfWALExists,
-      String prefix, String suffix, EventLoop eventLoop, Class<? extends 
Channel> channelClass)
-      throws FailedLogCloseException, IOException {
+      String prefix, String suffix, EventLoopGroup eventLoopGroup,
+      Class<? extends Channel> channelClass) throws FailedLogCloseException, 
IOException {
     super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, 
prefix, suffix);
-    this.eventLoop = eventLoop;
+    this.eventLoopGroup = eventLoopGroup;
     this.channelClass = channelClass;
     Supplier<Boolean> hasConsumerTask;
-    if (eventLoop instanceof SingleThreadEventExecutor) {
-
-      try {
-        Field field = 
SingleThreadEventExecutor.class.getDeclaredField("taskQueue");
-        field.setAccessible(true);
-        Queue<?> queue = (Queue<?>) field.get(eventLoop);
-        hasConsumerTask = () -> queue.peek() == consumer;
-      } catch (Exception e) {
-        LOG.warn("Can not get task queue of " + eventLoop + ", this is not 
necessary, just give up",
-          e);
+    if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, 
DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) {
+      this.consumeExecutor = eventLoopGroup.next();
+      if (consumeExecutor instanceof SingleThreadEventExecutor) {
+        try {
+          Field field = 
SingleThreadEventExecutor.class.getDeclaredField("taskQueue");
+          field.setAccessible(true);
+          Queue<?> queue = (Queue<?>) field.get(consumeExecutor);
+          hasConsumerTask = () -> queue.peek() == consumer;
+        } catch (Exception e) {
+          LOG.warn("Can not get task queue of " + consumeExecutor +
+            ", this is not necessary, just give up", e);
+          hasConsumerTask = () -> false;
+        }
+      } else {
         hasConsumerTask = () -> false;
       }
     } else {
-      hasConsumerTask = () -> false;
+      ThreadPoolExecutor threadPool =
+        new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new 
LinkedBlockingQueue<Runnable>(),
+            new 
ThreadFactoryBuilder().setNameFormat("AsyncFSWAL-%d").setDaemon(true).build());
+      hasConsumerTask = () -> threadPool.getQueue().peek() == consumer;
+      this.consumeExecutor = threadPool;
     }
+
     this.hasConsumerTask = hasConsumerTask;
     int preallocatedEventCount =
-        this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 
* 16);
+      conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16);
     waitingConsumePayloads =
-        RingBuffer.createMultiProducer(RingBufferTruck::new, 
preallocatedEventCount);
+      RingBuffer.createMultiProducer(RingBufferTruck::new, 
preallocatedEventCount);
     waitingConsumePayloadsGatingSequence = new 
Sequence(Sequencer.INITIAL_CURSOR_VALUE);
     
waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence);
 
@@ -229,23 +253,35 @@ public class AsyncFSWAL extends 
AbstractFSWAL<AsyncWriter> {
 
     batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE);
     createMaxRetries =
-        conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, 
DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES);
+      conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, 
DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES);
     rollWriter();
   }
 
+  private static boolean waitingRoll(int epochAndState) {
+    return (epochAndState & 1) != 0;
+  }
+
+  private static boolean writerBroken(int epochAndState) {
+    return ((epochAndState >>> 1) & 1) != 0;
+  }
+
+  private static int epoch(int epochAndState) {
+    return epochAndState >>> 2;
+  }
+
   // return whether we have successfully set readyForRolling to true.
   private boolean trySetReadyForRolling() {
     // Check without holding lock first. Usually we will just return here.
     // waitingRoll is volatile and unacedEntries is only accessed inside event 
loop so it is safe to
     // check them outside the consumeLock.
-    if (!waitingRoll || !unackedAppends.isEmpty()) {
+    if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) {
       return false;
     }
     consumeLock.lock();
     try {
       // 1. a roll is requested
       // 2. all out-going entries have been acked(we have confirmed above).
-      if (waitingRoll) {
+      if (waitingRoll(epochAndState)) {
         readyForRolling = true;
         readyForRollingCond.signalAll();
         return true;
@@ -257,26 +293,25 @@ public class AsyncFSWAL extends 
AbstractFSWAL<AsyncWriter> {
     }
   }
 
-  private void syncFailed(Throwable error) {
+  private void syncFailed(long epochWhenSync, Throwable error) {
     LOG.warn("sync failed", error);
-    // Here we depends on the implementation of FanOutOneBlockAsyncDFSOutput 
and netty.
-    // When error occur, FanOutOneBlockAsyncDFSOutput will fail all pending 
flush requests. It
-    // is execute inside EventLoop. And in DefaultPromise in netty, it will 
notifyListener
-    // directly if it is already in the EventLoop thread. And in the listener 
method, it will
-    // call us. So here we know that all failed flush request will call us 
continuously, and
-    // before the last one finish, no other task can be executed in EventLoop. 
So here we are
-    // safe to use writerBroken as a guard.
-    // Do not forget to revisit this if we change the implementation of
-    // FanOutOneBlockAsyncDFSOutput!
+    boolean shouldRequestLogRoll = true;
     consumeLock.lock();
     try {
-      if (writerBroken) {
+      int currentEpochAndState = epochAndState;
+      if (epoch(currentEpochAndState) != epochWhenSync || 
writerBroken(currentEpochAndState)) {
+        // this is not the previous writer which means we have already rolled 
the writer.
+        // or this is still the current writer, but we have already marked it 
as broken and request
+        // a roll.
         return;
       }
-      writerBroken = true;
-      if (waitingRoll) {
+      this.epochAndState = currentEpochAndState | 0b10;
+      if (waitingRoll(currentEpochAndState)) {
         readyForRolling = true;
         readyForRollingCond.signalAll();
+        // this means we have already in the middle of a rollWriter so just 
tell the roller thread
+        // that you can continue without requesting an extra log roll.
+        shouldRequestLogRoll = false;
       }
     } finally {
       consumeLock.unlock();
@@ -285,8 +320,10 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> 
{
       toWriteAppends.addFirst(iter.next());
     }
     highestUnsyncedTxid = highestSyncedTxid.get();
-    // request a roll.
-    requestLogRoll();
+    if (shouldRequestLogRoll) {
+      // request a roll.
+      requestLogRoll();
+    }
   }
 
   private void syncCompleted(AsyncWriter writer, long processedTxid, long 
startTimeNs) {
@@ -299,30 +336,16 @@ public class AsyncFSWAL extends 
AbstractFSWAL<AsyncWriter> {
       }
     }
     postSync(System.nanoTime() - startTimeNs, finishSync(true));
-    // Ideally, we should set a flag to indicate that the log roll has already 
been requested for
-    // the current writer and give up here, and reset the flag when roll is 
finished. But we
-    // finish roll in the log roller thread so the flag need to be set by 
different thread which
-    // typically means we need to use a lock to protect it and do fencing. As 
the log roller will
-    // aggregate the roll requests of the same WAL, so it is safe to call 
requestLogRoll multiple
-    // times before the roll actual happens. But we need to stop if we set 
readyForRolling to true
-    // and wake up the log roller thread waiting in waitForSafePoint as the 
rollWriter call may
-    // return firstly and then we run the code below and request a roll on the 
new writer.
     if (trySetReadyForRolling()) {
       // we have just finished a roll, then do not need to check for log 
rolling, the writer will be
       // closed soon.
       return;
     }
-    if (writer.getLength() < logrollsize) {
+    if (writer.getLength() < logrollsize || rollRequested) {
       return;
     }
-    if (!rollWriterLock.tryLock()) {
-      return;
-    }
-    try {
-      requestLogRoll();
-    } finally {
-      rollWriterLock.unlock();
-    }
+    rollRequested = true;
+    requestLogRoll();
   }
 
   private void sync(AsyncWriter writer) {
@@ -330,19 +353,20 @@ public class AsyncFSWAL extends 
AbstractFSWAL<AsyncWriter> {
     long currentHighestProcessedAppendTxid = highestProcessedAppendTxid;
     highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid;
     final long startTimeNs = System.nanoTime();
-    writer.sync().whenComplete((result, error) -> {
+    final long epoch = epochAndState >>> 2;
+    writer.sync().whenCompleteAsync((result, error) -> {
       if (error != null) {
-        syncFailed(error);
+        syncFailed(epoch, error);
       } else {
         syncCompleted(writer, currentHighestProcessedAppendTxid, startTimeNs);
       }
-    });
+    }, consumeExecutor);
   }
 
   private void addTimeAnnotation(SyncFuture future, String annotation) {
     TraceUtil.addTimelineAnnotation(annotation);
-    //TODO handle htrace API change, see HBASE-18895
-    //future.setSpan(scope.getSpan());
+    // TODO handle htrace API change, see HBASE-18895
+    // future.setSpan(scope.getSpan());
   }
 
   private int finishSyncLowerThanTxid(long txid, boolean addSyncTrace) {
@@ -410,26 +434,10 @@ public class AsyncFSWAL extends 
AbstractFSWAL<AsyncWriter> {
     for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); 
iter.hasNext();) {
       FSWALEntry entry = iter.next();
       boolean appended;
-      Span span = entry.detachSpan();
-      // the span maybe null if this is a retry after rolling.
-      if (span != null) {
-        //TODO handle htrace API change, see HBASE-18895
-        //TraceScope scope = Trace.continueSpan(span);
-        try {
-          appended = append(writer, entry);
-        } catch (IOException e) {
-          throw new AssertionError("should not happen", e);
-        } finally {
-          //TODO handle htrace API change, see HBASE-18895
-          //assert scope == NullScope.INSTANCE || !scope.isDetached();
-          //scope.close(); // append scope is complete
-        }
-      } else {
-        try {
-          appended = append(writer, entry);
-        } catch (IOException e) {
-          throw new AssertionError("should not happen", e);
-        }
+      try {
+        appended = append(writer, entry);
+      } catch (IOException e) {
+        throw new AssertionError("should not happen", e);
       }
       newHighestProcessedAppendTxid = entry.getTxid();
       iter.remove();
@@ -472,10 +480,11 @@ public class AsyncFSWAL extends 
AbstractFSWAL<AsyncWriter> {
   private void consume() {
     consumeLock.lock();
     try {
-      if (writerBroken) {
+      int currentEpochAndState = epochAndState;
+      if (writerBroken(currentEpochAndState)) {
         return;
       }
-      if (waitingRoll) {
+      if (waitingRoll(currentEpochAndState)) {
         if (writer.getLength() > fileLengthAtLastSync) {
           // issue a sync
           sync(writer);
@@ -491,8 +500,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
       consumeLock.unlock();
     }
     long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1;
-    for (long cursorBound =
-        waitingConsumePayloads.getCursor(); nextCursor <= cursorBound; 
nextCursor++) {
+    for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor <= 
cursorBound;
+      nextCursor++) {
       if (!waitingConsumePayloads.isPublished(nextCursor)) {
         break;
       }
@@ -540,11 +549,12 @@ public class AsyncFSWAL extends 
AbstractFSWAL<AsyncWriter> {
       }
     }
     // reschedule if we still have something to write.
-    eventLoop.execute(consumer);
+    consumeExecutor.execute(consumer);
   }
 
   private boolean shouldScheduleConsumer() {
-    if (writerBroken || waitingRoll) {
+    int currentEpochAndState = epochAndState;
+    if (writerBroken(currentEpochAndState) || 
waitingRoll(currentEpochAndState)) {
       return false;
     }
     return consumerScheduled.compareAndSet(false, true);
@@ -554,16 +564,16 @@ public class AsyncFSWAL extends 
AbstractFSWAL<AsyncWriter> {
   public long append(RegionInfo hri, WALKey key, WALEdit edits, boolean 
inMemstore)
       throws IOException {
     long txid =
-        stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, 
waitingConsumePayloads);
+      stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, 
waitingConsumePayloads);
     if (shouldScheduleConsumer()) {
-      eventLoop.execute(consumer);
+      consumeExecutor.execute(consumer);
     }
     return txid;
   }
 
   @Override
   public void sync() throws IOException {
-    try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")){
+    try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) {
       long txid = waitingConsumePayloads.next();
       SyncFuture future;
       try {
@@ -574,7 +584,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
         waitingConsumePayloads.publish(txid);
       }
       if (shouldScheduleConsumer()) {
-        eventLoop.execute(consumer);
+        consumeExecutor.execute(consumer);
       }
       blockOnSync(future);
     }
@@ -597,7 +607,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
         waitingConsumePayloads.publish(sequence);
       }
       if (shouldScheduleConsumer()) {
-        eventLoop.execute(consumer);
+        consumeExecutor.execute(consumer);
       }
       blockOnSync(future);
     }
@@ -608,7 +618,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     boolean overwrite = false;
     for (int retry = 0;; retry++) {
       try {
-        return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, overwrite, 
eventLoop,
+        return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, overwrite, 
eventLoopGroup,
           channelClass);
       } catch (RemoteException e) {
         LOG.warn("create wal log writer " + path + " failed, retry = " + 
retry, e);
@@ -643,20 +653,21 @@ public class AsyncFSWAL extends 
AbstractFSWAL<AsyncWriter> {
         }
       }
     }
-    throw new IOException("Failed to create wal log writer " + path + " after 
retrying "
-        + createMaxRetries + " time(s)");
+    throw new IOException("Failed to create wal log writer " + path + " after 
retrying " +
+      createMaxRetries + " time(s)");
   }
 
   private void waitForSafePoint() {
     consumeLock.lock();
     try {
-      if (writerBroken || this.writer == null) {
+      int currentEpochAndState = epochAndState;
+      if (writerBroken(currentEpochAndState) || this.writer == null) {
         return;
       }
       consumerScheduled.set(true);
-      waitingRoll = true;
+      epochAndState = currentEpochAndState | 1;
       readyForRolling = false;
-      eventLoop.execute(consumer);
+      consumeExecutor.execute(consumer);
       while (!readyForRolling) {
         readyForRollingCond.awaitUninterruptibly();
       }
@@ -674,13 +685,17 @@ public class AsyncFSWAL extends 
AbstractFSWAL<AsyncWriter> {
     if (nextWriter != null && nextWriter instanceof AsyncProtobufLogWriter) {
       this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput();
     }
-    this.fileLengthAtLastSync = 0L;
+    this.fileLengthAtLastSync = nextWriter.getLength();
+    this.rollRequested = false;
     this.highestProcessedAppendTxidAtLastSync = 0L;
     consumeLock.lock();
     try {
       consumerScheduled.set(true);
-      writerBroken = waitingRoll = false;
-      eventLoop.execute(consumer);
+      int currentEpoch = epochAndState >>> 2;
+      int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1;
+      // set a new epoch and also clear waitingRoll and writerBroken
+      this.epochAndState = nextEpoch << 2;
+      consumeExecutor.execute(consumer);
     } finally {
       consumeLock.unlock();
     }
@@ -710,6 +725,9 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
     closeExecutor.shutdown();
     IOException error = new IOException("WAL has been closed");
     syncFutures.forEach(f -> f.done(f.getTxid(), error));
+    if (!(consumeExecutor instanceof EventLoop)) {
+      consumeExecutor.shutdown();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/df366881/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
index f3c5bf2..482500e 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java
@@ -17,11 +17,6 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
-
-import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
-
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.OutputStream;
@@ -35,15 +30,19 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.io.ByteBufferWriter;
 import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
 import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
 import 
org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException;
 import org.apache.hadoop.hbase.wal.AsyncFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer;
 
 /**
  * AsyncWriter for protobuf-based WAL.
@@ -54,7 +53,7 @@ public class AsyncProtobufLogWriter extends 
AbstractProtobufLogWriter
 
   private static final Log LOG = 
LogFactory.getLog(AsyncProtobufLogWriter.class);
 
-  private final EventLoop eventLoop;
+  private final EventLoopGroup eventLoopGroup;
 
   private final Class<? extends Channel> channelClass;
 
@@ -103,8 +102,9 @@ public class AsyncProtobufLogWriter extends 
AbstractProtobufLogWriter
 
   private OutputStream asyncOutputWrapper;
 
-  public AsyncProtobufLogWriter(EventLoop eventLoop, Class<? extends Channel> 
channelClass) {
-    this.eventLoop = eventLoop;
+  public AsyncProtobufLogWriter(EventLoopGroup eventLoopGroup,
+      Class<? extends Channel> channelClass) {
+    this.eventLoopGroup = eventLoopGroup;
     this.channelClass = channelClass;
   }
 
@@ -156,13 +156,13 @@ public class AsyncProtobufLogWriter extends 
AbstractProtobufLogWriter
   protected void initOutput(FileSystem fs, Path path, boolean overwritable, 
int bufferSize,
       short replication, long blockSize) throws IOException, 
StreamLacksCapabilityException {
     this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, 
false, replication,
-        blockSize, eventLoop, channelClass);
+        blockSize, eventLoopGroup, channelClass);
     this.asyncOutputWrapper = new OutputStreamWrapper(output);
   }
 
   private long write(Consumer<CompletableFuture<Long>> action) throws 
IOException {
     CompletableFuture<Long> future = new CompletableFuture<>();
-    eventLoop.execute(() -> action.accept(future));
+    action.accept(future);
     try {
       return future.get().longValue();
     } catch (InterruptedException e) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/df366881/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
index debe9e4..a928ad5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.util.CollectionUtils;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.htrace.core.Span;
 import org.apache.yetus.audience.InterfaceAudience;
 
 import 
org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
@@ -58,9 +57,6 @@ class FSWALEntry extends Entry {
   private final transient RegionInfo regionInfo;
   private final transient Set<byte[]> familyNames;
 
-  // The tracing span for this entry when writing WAL.
-  private transient Span span;
-
   FSWALEntry(final long txid, final WALKey key, final WALEdit edit,
       final RegionInfo regionInfo, final boolean inMemstore) {
     super(key, edit);
@@ -130,14 +126,4 @@ class FSWALEntry extends Entry {
   Set<byte[]> getFamilyNames() {
     return familyNames;
   }
-
-  void attachSpan(Span span) {
-    this.span = span;
-  }
-
-  Span detachSpan() {
-    Span span = this.span;
-    this.span = null;
-    return span;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/df366881/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java
index 021f6a1..dfef429 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java
@@ -18,7 +18,6 @@
  */
 package org.apache.hadoop.hbase.regionserver.wal;
 
-import org.apache.htrace.core.Span;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
@@ -43,10 +42,9 @@ final class RingBufferTruck {
   private FSWALEntry entry;
 
   /**
-   * Load the truck with a {@link FSWALEntry} and associated {@link Span}.
+   * Load the truck with a {@link FSWALEntry}.
    */
-  void load(FSWALEntry entry, Span span) {
-    entry.attachSpan(span);
+  void load(FSWALEntry entry) {
     this.entry = entry;
     this.type = Type.APPEND;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/df366881/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java
index a686a1b..fd7387b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java
@@ -21,20 +21,21 @@ import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.io.crypto.Encryptor;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
+import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
+import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader;
 
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
 public class SecureAsyncProtobufLogWriter extends AsyncProtobufLogWriter {
 
   private Encryptor encryptor = null;
 
-  public SecureAsyncProtobufLogWriter(EventLoop eventLoop, Class<? extends 
Channel> channelClass) {
-    super(eventLoop, channelClass);
+  public SecureAsyncProtobufLogWriter(EventLoopGroup eventLoopGroup,
+      Class<? extends Channel> channelClass) {
+    super(eventLoopGroup, channelClass);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/df366881/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
index bf3b2ad..5cb0189 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java
@@ -34,7 +34,6 @@ import org.apache.yetus.audience.InterfaceStability;
 
 import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
 import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
-import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
 import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup;
 import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup;
 import 
org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel;
@@ -69,7 +68,7 @@ public class AsyncFSWALProvider extends 
AbstractFSWALProvider<AsyncFSWAL> {
         getWALDirectoryName(factory.factoryId),
         getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, 
true, logPrefix,
         META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null,
-        eventLoopGroup.next(), channelClass);
+        eventLoopGroup, channelClass);
   }
 
   @Override
@@ -90,23 +89,23 @@ public class AsyncFSWALProvider extends 
AbstractFSWALProvider<AsyncFSWAL> {
    * public because of AsyncFSWAL. Should be package-private
    */
   public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem 
fs, Path path,
-      boolean overwritable, EventLoop eventLoop, Class<? extends Channel> 
channelClass)
+      boolean overwritable, EventLoopGroup eventLoopGroup, Class<? extends 
Channel> channelClass)
       throws IOException {
     // Configuration already does caching for the Class lookup.
     Class<? extends AsyncWriter> logWriterClass = conf.getClass(
       "hbase.regionserver.hlog.async.writer.impl", 
AsyncProtobufLogWriter.class, AsyncWriter.class);
     try {
-      AsyncWriter writer = logWriterClass.getConstructor(EventLoop.class, 
Class.class)
-          .newInstance(eventLoop, channelClass);
+      AsyncWriter writer = logWriterClass.getConstructor(EventLoopGroup.class, 
Class.class)
+          .newInstance(eventLoopGroup, channelClass);
       writer.init(fs, path, conf, overwritable);
       return writer;
     } catch (Exception e) {
       if (e instanceof CommonFSUtils.StreamLacksCapabilityException) {
         LOG.error("The RegionServer async write ahead log provider " +
-            "relies on the ability to call " + e.getMessage() + " for proper 
operation during " +
-            "component failures, but the current FileSystem does not support 
doing so. Please " +
-            "check the config value of '" + CommonFSUtils.HBASE_WAL_DIR + "' 
and ensure " +
-            "it points to a FileSystem mount that has suitable capabilities 
for output streams.");
+          "relies on the ability to call " + e.getMessage() + " for proper 
operation during " +
+          "component failures, but the current FileSystem does not support 
doing so. Please " +
+          "check the config value of '" + CommonFSUtils.HBASE_WAL_DIR + "' and 
ensure " +
+          "it points to a FileSystem mount that has suitable capabilities for 
output streams.");
       } else {
         LOG.debug("Error instantiating log writer.", e);
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/df366881/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
index 2ae916f..75a64aa 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java
@@ -63,7 +63,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
       Configuration conf, List<WALActionsListener> listeners, boolean 
failIfWALExists,
       String prefix, String suffix) throws IOException {
     return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, 
failIfWALExists, prefix,
-        suffix, GROUP.next(), CHANNEL_CLASS);
+        suffix, GROUP, CHANNEL_CLASS);
   }
 
   @Override
@@ -72,7 +72,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL {
       boolean failIfWALExists, String prefix, String suffix, final Runnable 
action)
       throws IOException {
     return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, 
failIfWALExists, prefix,
-        suffix, GROUP.next(), CHANNEL_CLASS) {
+        suffix, GROUP, CHANNEL_CLASS) {
 
       @Override
       void atHeadOfRingBufferEventHandlerAppend() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/df366881/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java
index 881cf7c..8de15df 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java
@@ -62,6 +62,6 @@ public class TestAsyncWALReplay extends AbstractTestWALReplay 
{
   @Override
   protected WAL createWAL(Configuration c, Path hbaseRootDir, String logName) 
throws IOException {
     return new AsyncFSWAL(FileSystem.get(c), hbaseRootDir, logName,
-        HConstants.HREGION_OLDLOGDIR_NAME, c, null, true, null, null, 
GROUP.next(), CHANNEL_CLASS);
+        HConstants.HREGION_OLDLOGDIR_NAME, c, null, true, null, null, GROUP, 
CHANNEL_CLASS);
   }
 }

Reply via email to