Repository: hbase Updated Branches: refs/heads/branch-2 2bda22a64 -> df3668818
HBASE-19344 improve asyncWAL by using Independent thread for netty #IO in FanOutOneBlockAsyncDFSOutput Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/df366881 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/df366881 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/df366881 Branch: refs/heads/branch-2 Commit: df3668818de6b5d78f7e22911186909ad6aaf113 Parents: 2bda22a Author: zhangduo <zhang...@apache.org> Authored: Thu Nov 30 22:02:10 2017 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Fri Dec 1 11:19:09 2017 +0800 ---------------------------------------------------------------------- .../hbase/regionserver/wal/AbstractFSWAL.java | 6 +- .../hbase/regionserver/wal/AsyncFSWAL.java | 252 ++++++++++--------- .../wal/AsyncProtobufLogWriter.java | 26 +- .../hbase/regionserver/wal/FSWALEntry.java | 14 -- .../hbase/regionserver/wal/RingBufferTruck.java | 6 +- .../wal/SecureAsyncProtobufLogWriter.java | 11 +- .../hadoop/hbase/wal/AsyncFSWALProvider.java | 17 +- .../hbase/regionserver/wal/TestAsyncFSWAL.java | 4 +- .../regionserver/wal/TestAsyncWALReplay.java | 2 +- 9 files changed, 168 insertions(+), 170 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/df366881/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 64f44cd..534315e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -969,11 +969,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL { try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) { FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore); entry.stampRegionSequenceId(we); - if (scope != null) { - ringBuffer.get(txid).load(entry, scope.getSpan()); - } else { - ringBuffer.get(txid).load(entry, null); - } + ringBuffer.get(txid).load(entry); } finally { ringBuffer.publish(txid); } http://git-wip-us.apache.org/repos/asf/hbase/blob/df366881/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 9aad2bc..18007aa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -19,6 +19,10 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate; +import com.lmax.disruptor.RingBuffer; +import com.lmax.disruptor.Sequence; +import com.lmax.disruptor.Sequencer; + import java.io.IOException; import java.io.InterruptedIOException; import java.lang.reflect.Field; @@ -32,6 +36,9 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -44,28 +51,25 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.trace.TraceUtil; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.ConnectionUtils; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.NameNodeException; +import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; -import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel; -import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop; -import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.SingleThreadEventExecutor; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.ipc.RemoteException; -import org.apache.htrace.core.Span; import org.apache.htrace.core.TraceScope; +import org.apache.yetus.audience.InterfaceAudience; -import com.lmax.disruptor.RingBuffer; -import com.lmax.disruptor.Sequence; -import com.lmax.disruptor.Sequencer; +import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel; +import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop; +import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup; +import org.apache.hadoop.hbase.shaded.io.netty.util.concurrent.SingleThreadEventExecutor; /** * An asynchronous implementation of FSWAL. @@ -81,7 +85,7 @@ import com.lmax.disruptor.Sequencer; * </li> * </ol> * </li> - * <li>In the consumer task(in the EventLoop thread) + * <li>In the consumer task(executed in a single threaded thread pool) * <ol> * <li>Poll the entry from {@link #waitingConsumePayloads} and insert it into * {@link #toWriteAppends}</li> @@ -117,14 +121,12 @@ import com.lmax.disruptor.Sequencer; * signal the {@link #readyForRollingCond}.</li> * <li>Back to the log roller thread, now we can confirm that there are no out-going entries, i.e., * we reach a safe point. So it is safe to replace old writer with new writer now.</li> - * <li>Set {@link #writerBroken} and {@link #waitingRoll} to false, cancel log roller exit checker - * if any(see the comments in the {@link #syncFailed(Throwable)} method to see why we need a checker - * here).</li> + * <li>Set {@link #writerBroken} and {@link #waitingRoll} to false.</li> * <li>Schedule the consumer task.</li> * <li>Schedule a background task to close the old writer.</li> * </ol> * For a broken writer roll request, the only difference is that we can bypass the wait for safe - * point stage. See the comments in the {@link #syncFailed(Throwable)} method for more details. + * point stage. */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { @@ -142,7 +144,13 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { public static final String ASYNC_WAL_CREATE_MAX_RETRIES = "hbase.wal.async.create.retries"; public static final int DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES = 10; - private final EventLoop eventLoop; + public static final String ASYNC_WAL_USE_SHARED_EVENT_LOOP = + "hbase.wal.async.use-shared-event-loop"; + public static final boolean DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP = true; + + private final EventLoopGroup eventLoopGroup; + + private final ExecutorService consumeExecutor; private final Class<? extends Channel> channelClass; @@ -153,8 +161,18 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { // check if there is already a consumer task in the event loop's task queue private final Supplier<Boolean> hasConsumerTask; - // new writer is created and we are waiting for old writer to be closed. - private volatile boolean waitingRoll; + private static final int MAX_EPOCH = 0x3FFFFFFF; + // the lowest bit is waitingRoll, which means new writer is created and we are waiting for old + // writer to be closed. + // the second lowest bit is writerBorken which means the current writer is broken and rollWriter + // is needed. + // all other bits are the epoch number of the current writer, this is used to detect whether the + // writer is still the one when you issue the sync. + // notice that, modification to this field is only allowed under the protection of consumeLock. + private volatile int epochAndState; + + // used to guard the log roll request when we exceed the log roll size. + private boolean rollRequested; private boolean readyForRolling; @@ -166,9 +184,6 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { private final AtomicBoolean consumerScheduled = new AtomicBoolean(false); - // writer is broken and rollWriter is needed. - private volatile boolean writerBroken; - private final long batchSize; private final int createMaxRetries; @@ -194,32 +209,41 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, - String prefix, String suffix, EventLoop eventLoop, Class<? extends Channel> channelClass) - throws FailedLogCloseException, IOException { + String prefix, String suffix, EventLoopGroup eventLoopGroup, + Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException { super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); - this.eventLoop = eventLoop; + this.eventLoopGroup = eventLoopGroup; this.channelClass = channelClass; Supplier<Boolean> hasConsumerTask; - if (eventLoop instanceof SingleThreadEventExecutor) { - - try { - Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue"); - field.setAccessible(true); - Queue<?> queue = (Queue<?>) field.get(eventLoop); - hasConsumerTask = () -> queue.peek() == consumer; - } catch (Exception e) { - LOG.warn("Can not get task queue of " + eventLoop + ", this is not necessary, just give up", - e); + if (conf.getBoolean(ASYNC_WAL_USE_SHARED_EVENT_LOOP, DEFAULT_ASYNC_WAL_USE_SHARED_EVENT_LOOP)) { + this.consumeExecutor = eventLoopGroup.next(); + if (consumeExecutor instanceof SingleThreadEventExecutor) { + try { + Field field = SingleThreadEventExecutor.class.getDeclaredField("taskQueue"); + field.setAccessible(true); + Queue<?> queue = (Queue<?>) field.get(consumeExecutor); + hasConsumerTask = () -> queue.peek() == consumer; + } catch (Exception e) { + LOG.warn("Can not get task queue of " + consumeExecutor + + ", this is not necessary, just give up", e); + hasConsumerTask = () -> false; + } + } else { hasConsumerTask = () -> false; } } else { - hasConsumerTask = () -> false; + ThreadPoolExecutor threadPool = + new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), + new ThreadFactoryBuilder().setNameFormat("AsyncFSWAL-%d").setDaemon(true).build()); + hasConsumerTask = () -> threadPool.getQueue().peek() == consumer; + this.consumeExecutor = threadPool; } + this.hasConsumerTask = hasConsumerTask; int preallocatedEventCount = - this.conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); + conf.getInt("hbase.regionserver.wal.disruptor.event.count", 1024 * 16); waitingConsumePayloads = - RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount); + RingBuffer.createMultiProducer(RingBufferTruck::new, preallocatedEventCount); waitingConsumePayloadsGatingSequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE); waitingConsumePayloads.addGatingSequences(waitingConsumePayloadsGatingSequence); @@ -229,23 +253,35 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE); createMaxRetries = - conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES); + conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES); rollWriter(); } + private static boolean waitingRoll(int epochAndState) { + return (epochAndState & 1) != 0; + } + + private static boolean writerBroken(int epochAndState) { + return ((epochAndState >>> 1) & 1) != 0; + } + + private static int epoch(int epochAndState) { + return epochAndState >>> 2; + } + // return whether we have successfully set readyForRolling to true. private boolean trySetReadyForRolling() { // Check without holding lock first. Usually we will just return here. // waitingRoll is volatile and unacedEntries is only accessed inside event loop so it is safe to // check them outside the consumeLock. - if (!waitingRoll || !unackedAppends.isEmpty()) { + if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) { return false; } consumeLock.lock(); try { // 1. a roll is requested // 2. all out-going entries have been acked(we have confirmed above). - if (waitingRoll) { + if (waitingRoll(epochAndState)) { readyForRolling = true; readyForRollingCond.signalAll(); return true; @@ -257,26 +293,25 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { } } - private void syncFailed(Throwable error) { + private void syncFailed(long epochWhenSync, Throwable error) { LOG.warn("sync failed", error); - // Here we depends on the implementation of FanOutOneBlockAsyncDFSOutput and netty. - // When error occur, FanOutOneBlockAsyncDFSOutput will fail all pending flush requests. It - // is execute inside EventLoop. And in DefaultPromise in netty, it will notifyListener - // directly if it is already in the EventLoop thread. And in the listener method, it will - // call us. So here we know that all failed flush request will call us continuously, and - // before the last one finish, no other task can be executed in EventLoop. So here we are - // safe to use writerBroken as a guard. - // Do not forget to revisit this if we change the implementation of - // FanOutOneBlockAsyncDFSOutput! + boolean shouldRequestLogRoll = true; consumeLock.lock(); try { - if (writerBroken) { + int currentEpochAndState = epochAndState; + if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) { + // this is not the previous writer which means we have already rolled the writer. + // or this is still the current writer, but we have already marked it as broken and request + // a roll. return; } - writerBroken = true; - if (waitingRoll) { + this.epochAndState = currentEpochAndState | 0b10; + if (waitingRoll(currentEpochAndState)) { readyForRolling = true; readyForRollingCond.signalAll(); + // this means we have already in the middle of a rollWriter so just tell the roller thread + // that you can continue without requesting an extra log roll. + shouldRequestLogRoll = false; } } finally { consumeLock.unlock(); @@ -285,8 +320,10 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { toWriteAppends.addFirst(iter.next()); } highestUnsyncedTxid = highestSyncedTxid.get(); - // request a roll. - requestLogRoll(); + if (shouldRequestLogRoll) { + // request a roll. + requestLogRoll(); + } } private void syncCompleted(AsyncWriter writer, long processedTxid, long startTimeNs) { @@ -299,30 +336,16 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { } } postSync(System.nanoTime() - startTimeNs, finishSync(true)); - // Ideally, we should set a flag to indicate that the log roll has already been requested for - // the current writer and give up here, and reset the flag when roll is finished. But we - // finish roll in the log roller thread so the flag need to be set by different thread which - // typically means we need to use a lock to protect it and do fencing. As the log roller will - // aggregate the roll requests of the same WAL, so it is safe to call requestLogRoll multiple - // times before the roll actual happens. But we need to stop if we set readyForRolling to true - // and wake up the log roller thread waiting in waitForSafePoint as the rollWriter call may - // return firstly and then we run the code below and request a roll on the new writer. if (trySetReadyForRolling()) { // we have just finished a roll, then do not need to check for log rolling, the writer will be // closed soon. return; } - if (writer.getLength() < logrollsize) { + if (writer.getLength() < logrollsize || rollRequested) { return; } - if (!rollWriterLock.tryLock()) { - return; - } - try { - requestLogRoll(); - } finally { - rollWriterLock.unlock(); - } + rollRequested = true; + requestLogRoll(); } private void sync(AsyncWriter writer) { @@ -330,19 +353,20 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { long currentHighestProcessedAppendTxid = highestProcessedAppendTxid; highestProcessedAppendTxidAtLastSync = currentHighestProcessedAppendTxid; final long startTimeNs = System.nanoTime(); - writer.sync().whenComplete((result, error) -> { + final long epoch = epochAndState >>> 2; + writer.sync().whenCompleteAsync((result, error) -> { if (error != null) { - syncFailed(error); + syncFailed(epoch, error); } else { syncCompleted(writer, currentHighestProcessedAppendTxid, startTimeNs); } - }); + }, consumeExecutor); } private void addTimeAnnotation(SyncFuture future, String annotation) { TraceUtil.addTimelineAnnotation(annotation); - //TODO handle htrace API change, see HBASE-18895 - //future.setSpan(scope.getSpan()); + // TODO handle htrace API change, see HBASE-18895 + // future.setSpan(scope.getSpan()); } private int finishSyncLowerThanTxid(long txid, boolean addSyncTrace) { @@ -410,26 +434,10 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { for (Iterator<FSWALEntry> iter = toWriteAppends.iterator(); iter.hasNext();) { FSWALEntry entry = iter.next(); boolean appended; - Span span = entry.detachSpan(); - // the span maybe null if this is a retry after rolling. - if (span != null) { - //TODO handle htrace API change, see HBASE-18895 - //TraceScope scope = Trace.continueSpan(span); - try { - appended = append(writer, entry); - } catch (IOException e) { - throw new AssertionError("should not happen", e); - } finally { - //TODO handle htrace API change, see HBASE-18895 - //assert scope == NullScope.INSTANCE || !scope.isDetached(); - //scope.close(); // append scope is complete - } - } else { - try { - appended = append(writer, entry); - } catch (IOException e) { - throw new AssertionError("should not happen", e); - } + try { + appended = append(writer, entry); + } catch (IOException e) { + throw new AssertionError("should not happen", e); } newHighestProcessedAppendTxid = entry.getTxid(); iter.remove(); @@ -472,10 +480,11 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { private void consume() { consumeLock.lock(); try { - if (writerBroken) { + int currentEpochAndState = epochAndState; + if (writerBroken(currentEpochAndState)) { return; } - if (waitingRoll) { + if (waitingRoll(currentEpochAndState)) { if (writer.getLength() > fileLengthAtLastSync) { // issue a sync sync(writer); @@ -491,8 +500,8 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { consumeLock.unlock(); } long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; - for (long cursorBound = - waitingConsumePayloads.getCursor(); nextCursor <= cursorBound; nextCursor++) { + for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor <= cursorBound; + nextCursor++) { if (!waitingConsumePayloads.isPublished(nextCursor)) { break; } @@ -540,11 +549,12 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { } } // reschedule if we still have something to write. - eventLoop.execute(consumer); + consumeExecutor.execute(consumer); } private boolean shouldScheduleConsumer() { - if (writerBroken || waitingRoll) { + int currentEpochAndState = epochAndState; + if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) { return false; } return consumerScheduled.compareAndSet(false, true); @@ -554,16 +564,16 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { public long append(RegionInfo hri, WALKey key, WALEdit edits, boolean inMemstore) throws IOException { long txid = - stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads); + stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads); if (shouldScheduleConsumer()) { - eventLoop.execute(consumer); + consumeExecutor.execute(consumer); } return txid; } @Override public void sync() throws IOException { - try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")){ + try (TraceScope scope = TraceUtil.createTrace("AsyncFSWAL.sync")) { long txid = waitingConsumePayloads.next(); SyncFuture future; try { @@ -574,7 +584,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { waitingConsumePayloads.publish(txid); } if (shouldScheduleConsumer()) { - eventLoop.execute(consumer); + consumeExecutor.execute(consumer); } blockOnSync(future); } @@ -597,7 +607,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { waitingConsumePayloads.publish(sequence); } if (shouldScheduleConsumer()) { - eventLoop.execute(consumer); + consumeExecutor.execute(consumer); } blockOnSync(future); } @@ -608,7 +618,7 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { boolean overwrite = false; for (int retry = 0;; retry++) { try { - return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, overwrite, eventLoop, + return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, overwrite, eventLoopGroup, channelClass); } catch (RemoteException e) { LOG.warn("create wal log writer " + path + " failed, retry = " + retry, e); @@ -643,20 +653,21 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { } } } - throw new IOException("Failed to create wal log writer " + path + " after retrying " - + createMaxRetries + " time(s)"); + throw new IOException("Failed to create wal log writer " + path + " after retrying " + + createMaxRetries + " time(s)"); } private void waitForSafePoint() { consumeLock.lock(); try { - if (writerBroken || this.writer == null) { + int currentEpochAndState = epochAndState; + if (writerBroken(currentEpochAndState) || this.writer == null) { return; } consumerScheduled.set(true); - waitingRoll = true; + epochAndState = currentEpochAndState | 1; readyForRolling = false; - eventLoop.execute(consumer); + consumeExecutor.execute(consumer); while (!readyForRolling) { readyForRollingCond.awaitUninterruptibly(); } @@ -674,13 +685,17 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { if (nextWriter != null && nextWriter instanceof AsyncProtobufLogWriter) { this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput(); } - this.fileLengthAtLastSync = 0L; + this.fileLengthAtLastSync = nextWriter.getLength(); + this.rollRequested = false; this.highestProcessedAppendTxidAtLastSync = 0L; consumeLock.lock(); try { consumerScheduled.set(true); - writerBroken = waitingRoll = false; - eventLoop.execute(consumer); + int currentEpoch = epochAndState >>> 2; + int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1; + // set a new epoch and also clear waitingRoll and writerBroken + this.epochAndState = nextEpoch << 2; + consumeExecutor.execute(consumer); } finally { consumeLock.unlock(); } @@ -710,6 +725,9 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { closeExecutor.shutdown(); IOException error = new IOException("WAL has been closed"); syncFutures.forEach(f -> f.done(f.getTxid(), error)); + if (!(consumeExecutor instanceof EventLoop)) { + consumeExecutor.shutdown(); + } } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/df366881/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java index f3c5bf2..482500e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -17,11 +17,6 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables; - -import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel; -import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop; - import java.io.IOException; import java.io.InterruptedIOException; import java.io.OutputStream; @@ -35,15 +30,19 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.io.ByteBufferWriter; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutputHelper; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; import org.apache.hadoop.hbase.util.CommonFSUtils.StreamLacksCapabilityException; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables; +import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel; +import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALTrailer; /** * AsyncWriter for protobuf-based WAL. @@ -54,7 +53,7 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter private static final Log LOG = LogFactory.getLog(AsyncProtobufLogWriter.class); - private final EventLoop eventLoop; + private final EventLoopGroup eventLoopGroup; private final Class<? extends Channel> channelClass; @@ -103,8 +102,9 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter private OutputStream asyncOutputWrapper; - public AsyncProtobufLogWriter(EventLoop eventLoop, Class<? extends Channel> channelClass) { - this.eventLoop = eventLoop; + public AsyncProtobufLogWriter(EventLoopGroup eventLoopGroup, + Class<? extends Channel> channelClass) { + this.eventLoopGroup = eventLoopGroup; this.channelClass = channelClass; } @@ -156,13 +156,13 @@ public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, short replication, long blockSize) throws IOException, StreamLacksCapabilityException { this.output = AsyncFSOutputHelper.createOutput(fs, path, overwritable, false, replication, - blockSize, eventLoop, channelClass); + blockSize, eventLoopGroup, channelClass); this.asyncOutputWrapper = new OutputStreamWrapper(output); } private long write(Consumer<CompletableFuture<Long>> action) throws IOException { CompletableFuture<Long> future = new CompletableFuture<>(); - eventLoop.execute(() -> action.accept(future)); + action.accept(future); try { return future.get().longValue(); } catch (InterruptedException e) { http://git-wip-us.apache.org/repos/asf/hbase/blob/df366881/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java index debe9e4..a928ad5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSWALEntry.java @@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.util.CollectionUtils; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.htrace.core.Span; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; @@ -58,9 +57,6 @@ class FSWALEntry extends Entry { private final transient RegionInfo regionInfo; private final transient Set<byte[]> familyNames; - // The tracing span for this entry when writing WAL. - private transient Span span; - FSWALEntry(final long txid, final WALKey key, final WALEdit edit, final RegionInfo regionInfo, final boolean inMemstore) { super(key, edit); @@ -130,14 +126,4 @@ class FSWALEntry extends Entry { Set<byte[]> getFamilyNames() { return familyNames; } - - void attachSpan(Span span) { - this.span = span; - } - - Span detachSpan() { - Span span = this.span; - this.span = null; - return span; - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/df366881/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java index 021f6a1..dfef429 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/RingBufferTruck.java @@ -18,7 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver.wal; -import org.apache.htrace.core.Span; import org.apache.yetus.audience.InterfaceAudience; /** @@ -43,10 +42,9 @@ final class RingBufferTruck { private FSWALEntry entry; /** - * Load the truck with a {@link FSWALEntry} and associated {@link Span}. + * Load the truck with a {@link FSWALEntry}. */ - void load(FSWALEntry entry, Span span) { - entry.attachSpan(span); + void load(FSWALEntry entry) { this.entry = entry; this.type = Type.APPEND; } http://git-wip-us.apache.org/repos/asf/hbase/blob/df366881/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java index a686a1b..fd7387b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SecureAsyncProtobufLogWriter.java @@ -21,20 +21,21 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.io.crypto.Encryptor; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel; -import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop; +import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALHeader; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) public class SecureAsyncProtobufLogWriter extends AsyncProtobufLogWriter { private Encryptor encryptor = null; - public SecureAsyncProtobufLogWriter(EventLoop eventLoop, Class<? extends Channel> channelClass) { - super(eventLoop, channelClass); + public SecureAsyncProtobufLogWriter(EventLoopGroup eventLoopGroup, + Class<? extends Channel> channelClass) { + super(eventLoopGroup, channelClass); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/df366881/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java index bf3b2ad..5cb0189 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java @@ -34,7 +34,6 @@ import org.apache.yetus.audience.InterfaceStability; import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables; import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel; -import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop; import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoopGroup; import org.apache.hadoop.hbase.shaded.io.netty.channel.nio.NioEventLoopGroup; import org.apache.hadoop.hbase.shaded.io.netty.channel.socket.nio.NioSocketChannel; @@ -69,7 +68,7 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> { getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, - eventLoopGroup.next(), channelClass); + eventLoopGroup, channelClass); } @Override @@ -90,23 +89,23 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> { * public because of AsyncFSWAL. Should be package-private */ public static AsyncWriter createAsyncWriter(Configuration conf, FileSystem fs, Path path, - boolean overwritable, EventLoop eventLoop, Class<? extends Channel> channelClass) + boolean overwritable, EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException { // Configuration already does caching for the Class lookup. Class<? extends AsyncWriter> logWriterClass = conf.getClass( "hbase.regionserver.hlog.async.writer.impl", AsyncProtobufLogWriter.class, AsyncWriter.class); try { - AsyncWriter writer = logWriterClass.getConstructor(EventLoop.class, Class.class) - .newInstance(eventLoop, channelClass); + AsyncWriter writer = logWriterClass.getConstructor(EventLoopGroup.class, Class.class) + .newInstance(eventLoopGroup, channelClass); writer.init(fs, path, conf, overwritable); return writer; } catch (Exception e) { if (e instanceof CommonFSUtils.StreamLacksCapabilityException) { LOG.error("The RegionServer async write ahead log provider " + - "relies on the ability to call " + e.getMessage() + " for proper operation during " + - "component failures, but the current FileSystem does not support doing so. Please " + - "check the config value of '" + CommonFSUtils.HBASE_WAL_DIR + "' and ensure " + - "it points to a FileSystem mount that has suitable capabilities for output streams."); + "relies on the ability to call " + e.getMessage() + " for proper operation during " + + "component failures, but the current FileSystem does not support doing so. Please " + + "check the config value of '" + CommonFSUtils.HBASE_WAL_DIR + "' and ensure " + + "it points to a FileSystem mount that has suitable capabilities for output streams."); } else { LOG.debug("Error instantiating log writer.", e); } http://git-wip-us.apache.org/repos/asf/hbase/blob/df366881/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java index 2ae916f..75a64aa 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncFSWAL.java @@ -63,7 +63,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL { Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, String suffix) throws IOException { return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, - suffix, GROUP.next(), CHANNEL_CLASS); + suffix, GROUP, CHANNEL_CLASS); } @Override @@ -72,7 +72,7 @@ public class TestAsyncFSWAL extends AbstractTestFSWAL { boolean failIfWALExists, String prefix, String suffix, final Runnable action) throws IOException { return new AsyncFSWAL(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, - suffix, GROUP.next(), CHANNEL_CLASS) { + suffix, GROUP, CHANNEL_CLASS) { @Override void atHeadOfRingBufferEventHandlerAppend() { http://git-wip-us.apache.org/repos/asf/hbase/blob/df366881/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java index 881cf7c..8de15df 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncWALReplay.java @@ -62,6 +62,6 @@ public class TestAsyncWALReplay extends AbstractTestWALReplay { @Override protected WAL createWAL(Configuration c, Path hbaseRootDir, String logName) throws IOException { return new AsyncFSWAL(FileSystem.get(c), hbaseRootDir, logName, - HConstants.HREGION_OLDLOGDIR_NAME, c, null, true, null, null, GROUP.next(), CHANNEL_CLASS); + HConstants.HREGION_OLDLOGDIR_NAME, c, null, true, null, null, GROUP, CHANNEL_CLASS); } }