HBASE-17090 Procedure v2 - fast wake if nothing else is running (Matteo Bertozzi)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/da97569e Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/da97569e Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/da97569e Branch: refs/heads/hbase-12439 Commit: da97569eae662ad90fd3afd98ef148c94eee4ac1 Parents: 306ef83 Author: Michael Stack <st...@apache.org> Authored: Tue Dec 27 16:19:32 2016 -0800 Committer: Michael Stack <st...@apache.org> Committed: Tue Dec 27 16:19:32 2016 -0800 ---------------------------------------------------------------------- .../hbase/procedure2/ProcedureExecutor.java | 4 +- .../procedure2/store/NoopProcedureStore.java | 5 +++ .../hbase/procedure2/store/ProcedureStore.java | 6 +++ .../procedure2/store/wal/WALProcedureStore.java | 42 ++++++++++++++------ .../wal/ProcedureWALPerformanceEvaluation.java | 17 +++----- 5 files changed, 48 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/da97569e/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 80c3804..c65f3fb 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -1536,7 +1536,7 @@ public class ProcedureExecutor<TEnvironment> { final Procedure procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); if (procedure == null) continue; - activeExecutorCount.incrementAndGet(); + store.setRunningProcedureCount(activeExecutorCount.incrementAndGet()); executionStartTime.set(EnvironmentEdgeManager.currentTime()); try { if (isTraceEnabled) { @@ -1544,7 +1544,7 @@ public class ProcedureExecutor<TEnvironment> { } executeProcedure(procedure); } finally { - activeExecutorCount.decrementAndGet(); + store.setRunningProcedureCount(activeExecutorCount.decrementAndGet()); lastUpdate = EnvironmentEdgeManager.currentTime(); executionStartTime.set(Long.MAX_VALUE); } http://git-wip-us.apache.org/repos/asf/hbase/blob/da97569e/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java index f248dc3..c03e326 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java @@ -52,6 +52,11 @@ public class NoopProcedureStore extends ProcedureStoreBase { } @Override + public void setRunningProcedureCount(final int count) { + // no-op + } + + @Override public void load(final ProcedureLoader loader) throws IOException { loader.setMaxProcId(0); } http://git-wip-us.apache.org/repos/asf/hbase/blob/da97569e/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java index e47ed63..032c8fc 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java @@ -151,6 +151,12 @@ public interface ProcedureStore { int getNumThreads(); /** + * Set the number of procedure running. + * This can be used, for example, by the store to know how long to wait before a sync. + */ + void setRunningProcedureCount(int count); + + /** * Acquire the lease for the procedure store. */ void recoverLease() throws IOException; http://git-wip-us.apache.org/repos/asf/hbase/blob/da97569e/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 922b681..4465993 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -136,7 +136,9 @@ public class WALProcedureStore extends ProcedureStoreBase { private LinkedTransferQueue<ByteSlot> slotsCache = null; private Set<ProcedureWALFile> corruptedLogs = null; private FSDataOutputStream stream = null; + private int runningProcCount = 1; private long flushLogId = 0; + private int syncMaxSlot = 1; private int slotIndex = 0; private Thread syncThread; private ByteSlot[] slots; @@ -198,6 +200,8 @@ public class WALProcedureStore extends ProcedureStoreBase { // Init buffer slots loading.set(true); + runningProcCount = numSlots; + syncMaxSlot = numSlots; slots = new ByteSlot[numSlots]; slotsCache = new LinkedTransferQueue(); while (slotsCache.size() < numSlots) { @@ -288,6 +292,12 @@ public class WALProcedureStore extends ProcedureStoreBase { return slots == null ? 0 : slots.length; } + @Override + public void setRunningProcedureCount(final int count) { + LOG.debug("set running procedure count=" + count + " slots=" + slots.length); + this.runningProcCount = count > 0 ? Math.min(count, slots.length) : slots.length; + } + public ProcedureStoreTracker getStoreTracker() { return storeTracker; } @@ -623,7 +633,7 @@ public class WALProcedureStore extends ProcedureStoreBase { throw new RuntimeException("sync aborted", syncException.get()); } else if (inSync.get()) { syncCond.await(); - } else if (slotIndex == slots.length) { + } else if (slotIndex >= syncMaxSlot) { slotCond.signal(); syncCond.await(); } else { @@ -642,7 +652,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } // Notify that the slots are full - if (slotIndex == slots.length) { + if (slotIndex == syncMaxSlot) { waitCond.signal(); slotCond.signal(); } @@ -725,8 +735,10 @@ public class WALProcedureStore extends ProcedureStoreBase { } } // Wait SYNC_WAIT_MSEC or the signal of "slots full" before flushing + syncMaxSlot = runningProcCount; + assert syncMaxSlot > 0 : "unexpected syncMaxSlot=" + syncMaxSlot; final long syncWaitSt = System.currentTimeMillis(); - if (slotIndex != slots.length) { + if (slotIndex != syncMaxSlot) { slotCond.await(syncWaitMsec, TimeUnit.MILLISECONDS); } @@ -734,7 +746,7 @@ public class WALProcedureStore extends ProcedureStoreBase { final long syncWaitMs = currentTs - syncWaitSt; final float rollSec = getMillisFromLastRoll() / 1000.0f; final float syncedPerSec = totalSyncedToStore / rollSec; - if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < slots.length)) { + if (LOG.isTraceEnabled() && (syncWaitMs > 10 || slotIndex < syncMaxSlot)) { LOG.trace(String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)", StringUtils.humanTimeDiff(syncWaitMs), slotIndex, StringUtils.humanSize(totalSyncedToStore), @@ -813,29 +825,33 @@ public class WALProcedureStore extends ProcedureStoreBase { return totalSynced; } - protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count) - throws IOException { + protected long syncSlots(final FSDataOutputStream stream, final ByteSlot[] slots, + final int offset, final int count) throws IOException { long totalSynced = 0; for (int i = 0; i < count; ++i) { - ByteSlot data = slots[offset + i]; + final ByteSlot data = slots[offset + i]; data.writeTo(stream); totalSynced += data.size(); } - if (useHsync) { - stream.hsync(); - } else { - stream.hflush(); - } + syncStream(stream); sendPostSyncSignal(); if (LOG.isTraceEnabled()) { - LOG.trace("Sync slots=" + count + '/' + slots.length + + LOG.trace("Sync slots=" + count + '/' + syncMaxSlot + ", flushed=" + StringUtils.humanSize(totalSynced)); } return totalSynced; } + protected void syncStream(final FSDataOutputStream stream) throws IOException { + if (useHsync) { + stream.hsync(); + } else { + stream.hflush(); + } + } + private boolean rollWriterWithRetries() { for (int i = 0; i < rollRetries && isRunning(); ++i) { if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i); http://git-wip-us.apache.org/repos/asf/hbase/blob/da97569e/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java index 363574b..641ac8e 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALPerformanceEvaluation.java @@ -149,7 +149,7 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool { // Start worker threads. long start = System.currentTimeMillis(); for (int i = 0; i < numThreads; i++) { - futures[i] = executor.submit(this.new Worker(start)); + futures[i] = executor.submit(new Worker(start)); } boolean failure = false; try { @@ -197,8 +197,8 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool { * If procedure store fails to roll log file (throws IOException), all threads quit, and at * least one returns value of {@link AbstractHBaseTool#EXIT_FAILURE}. */ - class Worker implements Callable<Integer> { - final long start; + private final class Worker implements Callable<Integer> { + private final long start; public Worker(long start) { this.start = start; @@ -243,7 +243,7 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool { } } - public class NoSyncWalProcedureStore extends WALProcedureStore { + private class NoSyncWalProcedureStore extends WALProcedureStore { public NoSyncWalProcedureStore(final Configuration conf, final FileSystem fs, final Path logDir) { super(conf, fs, logDir, new WALProcedureStore.LeaseRecovery() { @@ -255,13 +255,8 @@ public class ProcedureWALPerformanceEvaluation extends AbstractHBaseTool { } @Override - protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count) - throws IOException { - long totalSynced = 0; - for (int i = 0; i < count; ++i) { - totalSynced += slots[offset + i].size(); - } - return totalSynced; + protected void syncStream(FSDataOutputStream stream) { + // no-op } }