HBASE-16781 Fix flaky TestMasterProcedureWalLease
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/29d701a3 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/29d701a3 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/29d701a3 Branch: refs/heads/hbase-12439 Commit: 29d701a314b6bf56771a217b42c4c10832b15753 Parents: c7cae6b Author: Matteo Bertozzi <matteo.berto...@cloudera.com> Authored: Fri Oct 7 17:32:19 2016 -0700 Committer: Matteo Bertozzi <matteo.berto...@cloudera.com> Committed: Fri Oct 7 18:01:53 2016 -0700 ---------------------------------------------------------------------- .../procedure2/store/wal/WALProcedureStore.java | 41 +++++++++++++------- .../hadoop/hbase/master/MasterServices.java | 5 +++ .../master/procedure/MasterProcedureEnv.java | 7 +++- .../hbase/master/MockNoopMasterServices.java | 5 +++ .../MasterProcedureTestingUtility.java | 1 + 5 files changed, 43 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/29d701a3/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 36cf7af..1e60402 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -122,6 +122,7 @@ public class WALProcedureStore extends ProcedureStoreBase { private final AtomicBoolean inSync = new AtomicBoolean(false); private final AtomicLong totalSynced = new AtomicLong(0); private final AtomicLong lastRollTs = new AtomicLong(0); + private final AtomicLong syncId = new AtomicLong(0); private LinkedTransferQueue<ByteSlot> slotsCache = null; private Set<ProcedureWALFile> corruptedLogs = null; @@ -226,15 +227,15 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public void stop(boolean abort) { + public void stop(final boolean abort) { if (!setRunning(false)) { return; } - LOG.info("Stopping the WAL Procedure Store"); + LOG.info("Stopping the WAL Procedure Store, isAbort=" + abort + + (isSyncAborted() ? " (self aborting)" : "")); sendStopSignal(); - - if (!abort) { + if (!isSyncAborted()) { try { while (syncThread.isAlive()) { sendStopSignal(); @@ -525,6 +526,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } } + final long pushSyncId = syncId.get(); updateStoreTracker(type, procId, subProcIds); slots[slotIndex++] = slot; logId = flushLogId; @@ -540,7 +542,9 @@ public class WALProcedureStore extends ProcedureStoreBase { slotCond.signal(); } - syncCond.await(); + while (pushSyncId == syncId.get() && isRunning()) { + syncCond.await(); + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); sendAbortProcessSignal(); @@ -642,13 +646,15 @@ public class WALProcedureStore extends ProcedureStoreBase { totalSyncedToStore = totalSynced.addAndGet(slotSize); slotIndex = 0; inSync.set(false); + syncId.incrementAndGet(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - sendAbortProcessSignal(); syncException.compareAndSet(null, e); + sendAbortProcessSignal(); throw e; } catch (Throwable t) { syncException.compareAndSet(null, t); + sendAbortProcessSignal(); throw t; } finally { syncCond.signalAll(); @@ -679,13 +685,12 @@ public class WALProcedureStore extends ProcedureStoreBase { } catch (Throwable e) { LOG.warn("unable to sync slots, retry=" + retry); if (++retry >= maxRetriesBeforeRoll) { - if (logRolled >= maxSyncFailureRoll) { + if (logRolled >= maxSyncFailureRoll && isRunning()) { LOG.error("Sync slots after log roll failed, abort.", e); - sendAbortProcessSignal(); throw e; } - if (!rollWriterOrDie()) { + if (!rollWriterWithRetries()) { throw e; } @@ -720,8 +725,8 @@ public class WALProcedureStore extends ProcedureStoreBase { return totalSynced; } - private boolean rollWriterOrDie() { - for (int i = 0; i < rollRetries; ++i) { + private boolean rollWriterWithRetries() { + for (int i = 0; i < rollRetries && isRunning(); ++i) { if (i > 0) Threads.sleepWithoutInterrupt(waitBeforeRoll * i); try { @@ -733,8 +738,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } } LOG.fatal("Unable to roll the log"); - sendAbortProcessSignal(); - throw new RuntimeException("unable to roll the log"); + return false; } private boolean tryRollWriter() { @@ -777,7 +781,8 @@ public class WALProcedureStore extends ProcedureStoreBase { } } - @VisibleForTesting void removeInactiveLogsForTesting() throws Exception { + @VisibleForTesting + protected void removeInactiveLogsForTesting() throws Exception { lock.lock(); try { removeInactiveLogs(); @@ -812,6 +817,8 @@ public class WALProcedureStore extends ProcedureStoreBase { } private boolean rollWriter() throws IOException { + if (!isRunning()) return false; + // Create new state-log if (!rollWriter(flushLogId + 1)) { LOG.warn("someone else has already created log " + flushLogId); @@ -1043,6 +1050,10 @@ public class WALProcedureStore extends ProcedureStoreBase { for (int i = 0; i < logFiles.length; ++i) { final Path logPath = logFiles[i].getPath(); leaseRecovery.recoverFileLease(fs, logPath); + if (!isRunning()) { + throw new IOException("wal aborting"); + } + maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName())); ProcedureWALFile log = initOldLog(logFiles[i]); if (log != null) { @@ -1061,7 +1072,7 @@ public class WALProcedureStore extends ProcedureStoreBase { * it using entries in the log. */ private void initTrackerFromOldLogs() { - if (logs.isEmpty()) return; + if (logs.isEmpty() || !isRunning()) return; ProcedureWALFile log = logs.getLast(); if (!log.getTracker().isPartial()) { storeTracker.resetTo(log.getTracker()); http://git-wip-us.apache.org/repos/asf/hbase/blob/29d701a3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 670642f..9bdcf76 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -374,4 +374,9 @@ public interface MasterServices extends Server { * @return load balancer */ public LoadBalancer getLoadBalancer(); + + /** + * @return True if this master is stopping. + */ + boolean isStopping(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/29d701a3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java index 213f80c..e90813c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureEnv.java @@ -61,10 +61,15 @@ public class MasterProcedureEnv { @Override public boolean progress() { LOG.debug("Recover Procedure Store log lease: " + path); - return master.isActiveMaster(); + return isRunning(); } }); } + + private boolean isRunning() { + return master.isActiveMaster() && !master.isStopped() && + !master.isStopping() && !master.isAborted(); + } } @InterfaceAudience.Private http://git-wip-us.apache.org/repos/asf/hbase/blob/29d701a3/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java index 56a8522..87fb169 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockNoopMasterServices.java @@ -180,6 +180,11 @@ public class MockNoopMasterServices implements MasterServices, Server { } @Override + public boolean isStopping() { + return stopped; + } + + @Override public boolean isStopped() { return stopped; } http://git-wip-us.apache.org/repos/asf/hbase/blob/29d701a3/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java index 9b69830..c3cb2da 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureTestingUtility.java @@ -398,6 +398,7 @@ public class MasterProcedureTestingUtility { // restart executor/store // rollback step N - save on store InjectAbortOnLoadListener abortListener = new InjectAbortOnLoadListener(procExec); + abortListener.addProcId(procId); procExec.registerListener(abortListener); try { for (int i = 0; !procExec.isFinished(procId); ++i) {