HBASE-20708 Remove the usage of RecoverMetaProcedure in master startup
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6dbbd78a Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6dbbd78a Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6dbbd78a Branch: refs/heads/HBASE-19064 Commit: 6dbbd78aa00ed4877292d9cd48621803a175d51a Parents: b336da9 Author: zhangduo <zhang...@apache.org> Authored: Tue Jun 19 15:02:10 2018 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Tue Jun 19 15:02:10 2018 +0800 ---------------------------------------------------------------------- .../apache/hadoop/hbase/MetaTableAccessor.java | 10 +- .../hbase/procedure2/ProcedureExecutor.java | 45 +-- .../procedure2/ProcedureTestingUtility.java | 10 +- .../hbase/procedure2/TestChildProcedures.java | 4 +- .../hbase/procedure2/TestProcedureEvents.java | 4 +- .../procedure2/TestProcedureExecution.java | 4 +- .../hbase/procedure2/TestProcedureExecutor.java | 5 +- .../procedure2/TestProcedureInMemoryChore.java | 5 +- .../hbase/procedure2/TestProcedureMetrics.java | 2 +- .../hbase/procedure2/TestProcedureNonce.java | 4 +- .../hbase/procedure2/TestProcedureRecovery.java | 4 +- .../procedure2/TestProcedureReplayOrder.java | 6 +- .../procedure2/TestProcedureSuspended.java | 4 +- .../procedure2/TestStateMachineProcedure.java | 4 +- .../hbase/procedure2/TestYieldProcedures.java | 6 +- .../src/main/protobuf/MasterProcedure.proto | 15 +- .../hadoop/hbase/master/CatalogJanitor.java | 13 +- .../org/apache/hadoop/hbase/master/HMaster.java | 180 ++++++------ .../hbase/master/MasterMetaBootstrap.java | 41 +-- .../hadoop/hbase/master/MasterServices.java | 12 - .../hadoop/hbase/master/MasterWalManager.java | 28 +- .../hbase/master/RegionServerTracker.java | 30 +- .../hadoop/hbase/master/ServerManager.java | 163 ++--------- .../master/assignment/AssignmentManager.java | 293 +++++++------------ .../assignment/MergeTableRegionsProcedure.java | 2 +- .../master/assignment/RegionStateStore.java | 21 +- .../assignment/RegionTransitionProcedure.java | 11 +- .../assignment/SplitTableRegionProcedure.java | 2 +- .../AbstractStateMachineTableProcedure.java | 6 +- .../master/procedure/InitMetaProcedure.java | 115 ++++++++ .../master/procedure/MasterProcedureEnv.java | 16 +- .../procedure/MasterProcedureScheduler.java | 27 +- .../procedure/MetaProcedureInterface.java | 5 + .../hbase/master/procedure/MetaQueue.java | 5 + .../master/procedure/RecoverMetaProcedure.java | 9 +- .../hbase/master/procedure/SchemaLocking.java | 5 + .../master/procedure/ServerCrashProcedure.java | 61 ++-- .../hadoop/hbase/TestMetaTableAccessor.java | 2 - .../hbase/master/MockNoopMasterServices.java | 19 -- .../hadoop/hbase/master/TestCatalogJanitor.java | 3 +- .../master/assignment/MockMasterServices.java | 25 +- .../assignment/TestAssignmentManager.java | 17 +- .../MasterProcedureTestingUtility.java | 5 - .../procedure/TestMasterProcedureEvents.java | 76 +---- .../procedure/TestServerCrashProcedure.java | 32 +- 45 files changed, 610 insertions(+), 746 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/6dbbd78a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java index 91f3cf7..60afaca 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/MetaTableAccessor.java @@ -1346,9 +1346,17 @@ public class MetaTableAccessor { */ public static void putsToMetaTable(final Connection connection, final List<Put> ps) throws IOException { + if (ps.isEmpty()) { + return; + } try (Table t = getMetaHTable(connection)) { debugLogMutations(ps); - t.put(ps); + // the implementation for putting a single Put is much simpler so here we do a check first. + if (ps.size() == 1) { + t.put(ps.get(0)); + } else { + t.put(ps); + } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/6dbbd78a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 3a75d33..bd0a191 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -511,21 +511,16 @@ public class ProcedureExecutor<TEnvironment> { } /** - * Start the procedure executor. - * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to - * recover the lease, and ensure a single executor, and start the procedure - * replay to resume and recover the previous pending and in-progress perocedures. - * + * Initialize the procedure executor, but do not start workers. We will start them later. + * <p/> + * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and + * ensure a single executor, and start the procedure replay to resume and recover the previous + * pending and in-progress procedures. * @param numThreads number of threads available for procedure execution. - * @param abortOnCorruption true if you want to abort your service in case - * a corrupted procedure is found on replay. otherwise false. + * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure + * is found on replay. otherwise false. */ - public void start(int numThreads, boolean abortOnCorruption) throws IOException { - if (!running.compareAndSet(false, true)) { - LOG.warn("Already running"); - return; - } - + public void init(int numThreads, boolean abortOnCorruption) throws IOException { // We have numThreads executor + one timer thread used for timing out // procedures and triggering periodic procedures. this.corePoolSize = numThreads; @@ -546,11 +541,11 @@ public class ProcedureExecutor<TEnvironment> { long st, et; // Acquire the store lease. - st = EnvironmentEdgeManager.currentTime(); + st = System.nanoTime(); store.recoverLease(); - et = EnvironmentEdgeManager.currentTime(); + et = System.nanoTime(); LOG.info("Recovered {} lease in {}", store.getClass().getSimpleName(), - StringUtils.humanTimeDiff(et - st)); + StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st))); // start the procedure scheduler scheduler.start(); @@ -560,12 +555,21 @@ public class ProcedureExecutor<TEnvironment> { // The first one will make sure that we have the latest id, // so we can start the threads and accept new procedures. // The second step will do the actual load of old procedures. - st = EnvironmentEdgeManager.currentTime(); + st = System.nanoTime(); load(abortOnCorruption); - et = EnvironmentEdgeManager.currentTime(); + et = System.nanoTime(); LOG.info("Loaded {} in {}", store.getClass().getSimpleName(), - StringUtils.humanTimeDiff(et - st)); + StringUtils.humanTimeDiff(TimeUnit.NANOSECONDS.toMillis(et - st))); + } + /** + * Start the workers. + */ + public void startWorkers() throws IOException { + if (!running.compareAndSet(false, true)) { + LOG.warn("Already running"); + return; + } // Start the executors. Here we must have the lastProcId set. LOG.trace("Start workers {}", workerThreads.size()); timeoutExecutor.start(); @@ -861,7 +865,6 @@ public class ProcedureExecutor<TEnvironment> { justification = "FindBugs is blind to the check-for-null") public long submitProcedure(final Procedure proc, final NonceKey nonceKey) { Preconditions.checkArgument(lastProcId.get() >= 0); - Preconditions.checkArgument(isRunning(), "executor not running"); prepareProcedure(proc); @@ -895,7 +898,6 @@ public class ProcedureExecutor<TEnvironment> { // TODO: Do we need to take nonces here? public void submitProcedures(final Procedure[] procs) { Preconditions.checkArgument(lastProcId.get() >= 0); - Preconditions.checkArgument(isRunning(), "executor not running"); if (procs == null || procs.length <= 0) { return; } @@ -919,7 +921,6 @@ public class ProcedureExecutor<TEnvironment> { private Procedure prepareProcedure(final Procedure proc) { Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING); - Preconditions.checkArgument(isRunning(), "executor not running"); Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc); if (this.checkOwnerSet) { Preconditions.checkArgument(proc.hasOwner(), "missing owner"); http://git-wip-us.apache.org/repos/asf/hbase/blob/6dbbd78a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index d29e376..e8d72f9 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -70,6 +70,12 @@ public class ProcedureTestingUtility { restart(procExecutor, false, true, null, null); } + public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads, + boolean abortOnCorruption) throws IOException { + procExecutor.init(numThreads, abortOnCorruption); + procExecutor.startWorkers(); + } + public static <TEnv> void restart(final ProcedureExecutor<TEnv> procExecutor, final boolean avoidTestKillDuringRestart, final boolean failOnCorrupted, final Callable<Void> stopAction, final Callable<Void> startAction) @@ -98,7 +104,7 @@ public class ProcedureTestingUtility { // re-start LOG.info("RESTART - Start"); procStore.start(storeThreads); - procExecutor.start(execThreads, failOnCorrupted); + initAndStartWorkers(procExecutor, execThreads, failOnCorrupted); if (startAction != null) { startAction.call(); } @@ -183,7 +189,7 @@ public class ProcedureTestingUtility { NoopProcedureStore procStore = new NoopProcedureStore(); ProcedureExecutor<TEnv> procExecutor = new ProcedureExecutor<>(conf, env, procStore); procStore.start(1); - procExecutor.start(1, false); + initAndStartWorkers(procExecutor, 1, false); try { return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE); } finally { http://git-wip-us.apache.org/repos/asf/hbase/blob/6dbbd78a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java index 3d99b31..cce4caf 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java @@ -66,10 +66,10 @@ public class TestChildProcedures { logDir = new Path(testDir, "proc-logs"); procEnv = new TestProcEnv(); procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); - procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); + procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); procExecutor.testing = new ProcedureExecutor.Testing(); procStore.start(PROCEDURE_EXECUTOR_SLOTS); - procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); } @After http://git-wip-us.apache.org/repos/asf/hbase/blob/6dbbd78a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java index b7c59c8..8351e4c 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureEvents.java @@ -67,9 +67,9 @@ public class TestProcedureEvents { procEnv = new TestProcEnv(); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); - procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); + procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); procStore.start(1); - procExecutor.start(1, true); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, 1, true); } @After http://git-wip-us.apache.org/repos/asf/hbase/blob/6dbbd78a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java index 7e660e4..a3cff58 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecution.java @@ -71,9 +71,9 @@ public class TestProcedureExecution { logDir = new Path(testDir, "proc-logs"); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); - procExecutor = new ProcedureExecutor(htu.getConfiguration(), null, procStore); + procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), null, procStore); procStore.start(PROCEDURE_EXECUTOR_SLOTS); - procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); } @After http://git-wip-us.apache.org/repos/asf/hbase/blob/6dbbd78a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java index 1c53098..7f130ca 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java @@ -24,7 +24,6 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseCommonTestingUtility; -import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; import org.apache.hadoop.hbase.testclassification.MasterTests; @@ -71,8 +70,8 @@ public class TestProcedureExecutor { } private void createNewExecutor(final Configuration conf, final int numThreads) throws Exception { - procExecutor = new ProcedureExecutor(conf, procEnv, procStore); - procExecutor.start(numThreads, true); + procExecutor = new ProcedureExecutor<>(conf, procEnv, procStore); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, numThreads, true); } @Test http://git-wip-us.apache.org/repos/asf/hbase/blob/6dbbd78a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java index 86293e1..75c8d16 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java @@ -53,17 +53,16 @@ public class TestProcedureInMemoryChore { private HBaseCommonTestingUtility htu; - @SuppressWarnings("rawtypes") @Before public void setUp() throws IOException { htu = new HBaseCommonTestingUtility(); procEnv = new TestProcEnv(); procStore = new NoopProcedureStore(); - procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); + procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); procExecutor.testing = new ProcedureExecutor.Testing(); procStore.start(PROCEDURE_EXECUTOR_SLOTS); - procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); } @After http://git-wip-us.apache.org/repos/asf/hbase/blob/6dbbd78a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java index 94a293d..2acb7dd 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureMetrics.java @@ -75,7 +75,7 @@ public class TestProcedureMetrics { procExecutor = new ProcedureExecutor<TestProcEnv>(htu.getConfiguration(), procEnv, procStore); procExecutor.testing = new ProcedureExecutor.Testing(); procStore.start(PROCEDURE_EXECUTOR_SLOTS); - procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); } @After http://git-wip-us.apache.org/repos/asf/hbase/blob/6dbbd78a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java index b702314..2bf11fb 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureNonce.java @@ -72,10 +72,10 @@ public class TestProcedureNonce { logDir = new Path(testDir, "proc-logs"); procEnv = new TestProcEnv(); procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); - procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); + procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); procExecutor.testing = new ProcedureExecutor.Testing(); procStore.start(PROCEDURE_EXECUTOR_SLOTS); - procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); } @After http://git-wip-us.apache.org/repos/asf/hbase/blob/6dbbd78a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java index aece1de..532fcf3 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java @@ -76,10 +76,10 @@ public class TestProcedureRecovery { logDir = new Path(testDir, "proc-logs"); procEnv = new TestProcEnv(); procStore = ProcedureTestingUtility.createStore(htu.getConfiguration(), logDir); - procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); + procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); procExecutor.testing = new ProcedureExecutor.Testing(); procStore.start(PROCEDURE_EXECUTOR_SLOTS); - procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); procSleepInterval = 0; } http://git-wip-us.apache.org/repos/asf/hbase/blob/6dbbd78a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java index 7d0529e..319ddb2 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureReplayOrder.java @@ -53,7 +53,7 @@ public class TestProcedureReplayOrder { private static final int NUM_THREADS = 16; - private ProcedureExecutor<Void> procExecutor; + private ProcedureExecutor<TestProcedureEnv> procExecutor; private TestProcedureEnv procEnv; private ProcedureStore procStore; @@ -74,9 +74,9 @@ public class TestProcedureReplayOrder { logDir = new Path(testDir, "proc-logs"); procEnv = new TestProcedureEnv(); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); - procExecutor = new ProcedureExecutor(htu.getConfiguration(), procEnv, procStore); + procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); procStore.start(NUM_THREADS); - procExecutor.start(1, true); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, 1, true); } @After http://git-wip-us.apache.org/repos/asf/hbase/blob/6dbbd78a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java index 3da7c11..a9e919c 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java @@ -60,9 +60,9 @@ public class TestProcedureSuspended { htu = new HBaseCommonTestingUtility(); procStore = new NoopProcedureStore(); - procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore); + procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore); procStore.start(PROCEDURE_EXECUTOR_SLOTS); - procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); } @After http://git-wip-us.apache.org/repos/asf/hbase/blob/6dbbd78a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java index 69b2208..19ef4bb 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestStateMachineProcedure.java @@ -81,9 +81,9 @@ public class TestStateMachineProcedure { logDir = new Path(testDir, "proc-logs"); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); - procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), procStore); + procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore); procStore.start(PROCEDURE_EXECUTOR_SLOTS); - procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); } @After http://git-wip-us.apache.org/repos/asf/hbase/blob/6dbbd78a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java index 7fa7682..8d9b325 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java @@ -71,10 +71,10 @@ public class TestYieldProcedures { logDir = new Path(testDir, "proc-logs"); procStore = ProcedureTestingUtility.createWalStore(htu.getConfiguration(), logDir); procRunnables = new TestScheduler(); - procExecutor = new ProcedureExecutor(htu.getConfiguration(), new TestProcEnv(), - procStore, procRunnables); + procExecutor = + new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore, procRunnables); procStore.start(PROCEDURE_EXECUTOR_SLOTS); - procExecutor.start(PROCEDURE_EXECUTOR_SLOTS, true); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); } @After http://git-wip-us.apache.org/repos/asf/hbase/blob/6dbbd78a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto ---------------------------------------------------------------------- diff --git a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto index b65551f..0b4e1d7 100644 --- a/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto +++ b/hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto @@ -295,15 +295,17 @@ message RecoverMetaStateData { enum ServerCrashState { SERVER_CRASH_START = 1; - SERVER_CRASH_PROCESS_META = 2; + SERVER_CRASH_PROCESS_META = 2[deprecated=true]; SERVER_CRASH_GET_REGIONS = 3; - SERVER_CRASH_NO_SPLIT_LOGS = 4; + SERVER_CRASH_NO_SPLIT_LOGS = 4[deprecated=true]; SERVER_CRASH_SPLIT_LOGS = 5; // Removed SERVER_CRASH_PREPARE_LOG_REPLAY = 6; // Removed SERVER_CRASH_CALC_REGIONS_TO_ASSIGN = 7; SERVER_CRASH_ASSIGN = 8; SERVER_CRASH_WAIT_ON_ASSIGN = 9; - SERVER_CRASH_HANDLE_RIT2 = 20; + SERVER_CRASH_SPLIT_META_LOGS = 10; + SERVER_CRASH_ASSIGN_META = 11; + SERVER_CRASH_HANDLE_RIT2 = 20[deprecated=true]; SERVER_CRASH_FINISH = 100; } @@ -445,3 +447,10 @@ enum ReopenTableRegionsState { message ReopenTableRegionsStateData { required TableName table_name = 1; } + +enum InitMetaState { + INIT_META_ASSIGN_META = 1; +} + +message InitMetaStateData { +} http://git-wip-us.apache.org/repos/asf/hbase/blob/6dbbd78a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java index 23912d6..8515093 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/CatalogJanitor.java @@ -111,17 +111,14 @@ public class CatalogJanitor extends ScheduledChore { protected void chore() { try { AssignmentManager am = this.services.getAssignmentManager(); - if (this.enabled.get() - && !this.services.isInMaintenanceMode() - && am != null - && am.isFailoverCleanupDone() - && !am.hasRegionsInTransition()) { + if (this.enabled.get() && !this.services.isInMaintenanceMode() && am != null && + am.isMetaLoaded() && !am.hasRegionsInTransition()) { scan(); } else { LOG.warn("CatalogJanitor is disabled! Enabled=" + this.enabled.get() + - ", maintenanceMode=" + this.services.isInMaintenanceMode() + - ", am=" + am + ", failoverCleanupDone=" + (am != null && am.isFailoverCleanupDone()) + - ", hasRIT=" + (am != null && am.hasRegionsInTransition())); + ", maintenanceMode=" + this.services.isInMaintenanceMode() + ", am=" + am + + ", metaLoaded=" + (am != null && am.isMetaLoaded()) + ", hasRIT=" + + (am != null && am.hasRegionsInTransition())); } } catch (IOException e) { LOG.warn("Failed scan of catalog table", e); http://git-wip-us.apache.org/repos/asf/hbase/blob/6dbbd78a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 38aac50..d6b793a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1,4 +1,4 @@ -/* +/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -40,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -82,6 +83,7 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.MasterSwitchType; import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; @@ -122,13 +124,14 @@ import org.apache.hadoop.hbase.master.procedure.DeleteNamespaceProcedure; import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure; import org.apache.hadoop.hbase.master.procedure.DisableTableProcedure; import org.apache.hadoop.hbase.master.procedure.EnableTableProcedure; +import org.apache.hadoop.hbase.master.procedure.InitMetaProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; -import org.apache.hadoop.hbase.master.procedure.RecoverMetaProcedure; +import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure; import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure; import org.apache.hadoop.hbase.master.replication.AddPeerProcedure; import org.apache.hadoop.hbase.master.replication.DisablePeerProcedure; @@ -235,7 +238,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos; @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) @SuppressWarnings("deprecation") public class HMaster extends HRegionServer implements MasterServices { - private static Logger LOG = LoggerFactory.getLogger(HMaster.class.getName()); + private static Logger LOG = LoggerFactory.getLogger(HMaster.class); /** * Protection against zombie master. Started once Master accepts active responsibility and @@ -351,10 +354,6 @@ public class HMaster extends HRegionServer implements MasterServices { // initialization may have not completed yet. volatile boolean serviceStarted = false; - // flag set after we complete assignMeta. - private final ProcedureEvent<?> serverCrashProcessingEnabled = - new ProcedureEvent<>("server crash processing"); - // Maximum time we should run balancer for private final int maxBlancingTime; // Maximum percent of regions in transition when balancing @@ -725,7 +724,8 @@ public class HMaster extends HRegionServer implements MasterServices { /** * <p> - * Initialize all ZK based system trackers. + * Initialize all ZK based system trackers. But do not include {@link RegionServerTracker}, it + * should have already been initialized along with {@link ServerManager}. * </p> * <p> * Will be overridden in tests. @@ -747,15 +747,8 @@ public class HMaster extends HRegionServer implements MasterServices { this.splitOrMergeTracker = new SplitOrMergeTracker(zooKeeper, conf, this); this.splitOrMergeTracker.start(); - // Create Assignment Manager - this.assignmentManager = new AssignmentManager(this); - this.assignmentManager.start(); - this.replicationPeerManager = ReplicationPeerManager.create(zooKeeper, conf); - this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager); - this.regionServerTracker.start(); - this.drainingServerTracker = new DrainingServerTracker(zooKeeper, this, this.serverManager); this.drainingServerTracker.start(); @@ -800,18 +793,40 @@ public class HMaster extends HRegionServer implements MasterServices { /** * Finish initialization of HMaster after becoming the primary master. - * + * <p/> + * The startup order is a bit complicated but very important, do not change it unless you know + * what you are doing. * <ol> - * <li>Initialize master components - file system manager, server manager, - * assignment manager, region server tracker, etc</li> - * <li>Start necessary service threads - balancer, catalog janior, - * executor services, etc</li> - * <li>Set cluster as UP in ZooKeeper</li> - * <li>Wait for RegionServers to check-in</li> - * <li>Split logs and perform data recovery, if necessary</li> - * <li>Ensure assignment of meta/namespace regions<li> - * <li>Handle either fresh cluster start or master failover</li> + * <li>Initialize file system based components - file system manager, wal manager, table + * descriptors, etc</li> + * <li>Publish cluster id</li> + * <li>Here comes the most complicated part - initialize server manager, assignment manager and + * region server tracker + * <ol type='i'> + * <li>Create server manager</li> + * <li>Create procedure executor, load the procedures, but do not start workers. We will start it + * later after we finish scheduling SCPs to avoid scheduling duplicated SCPs for the same + * server</li> + * <li>Create assignment manager and start it, load the meta region state, but do not load data + * from meta region</li> + * <li>Start region server tracker, construct the online servers set and find out dead servers and + * schedule SCP for them. The online servers will be constructed by scanning zk, and we will also + * scan the wal directory to find out possible live region servers, and the differences between + * these two sets are the dead servers</li> + * </ol> + * </li> + * <li>If this is a new deploy, schedule a InitMetaProcedure to initialize meta</li> + * <li>Start necessary service threads - balancer, catalog janior, executor services, and also the + * procedure executor, etc. Notice that the balancer must be created first as assignment manager + * may use it when assigning regions.</li> + * <li>Wait for meta to be initialized if necesssary, start table state manager.</li> + * <li>Wait for enough region servers to check-in</li> + * <li>Let assignment manager load data from meta and construct region states</li> + * <li>Start all other things such as chore services, etc</li> * </ol> + * <p/> + * Notice that now we will not schedule a special procedure to make meta online(unless the first + * time where meta has not been created yet), we will rely on SCP to bring meta online. */ private void finishActiveMasterInitialization(MonitoredTask status) throws IOException, InterruptedException, KeeperException, ReplicationException { @@ -849,10 +864,20 @@ public class HMaster extends HRegionServer implements MasterServices { ZKClusterId.setClusterId(this.zooKeeper, fileSystemManager.getClusterId()); this.clusterId = clusterId.toString(); - this.serverManager = createServerManager(this); - // This manager is started AFTER hbase:meta is confirmed on line. - // See inside metaBootstrap.recoverMeta(); below. Shouldn't be so cryptic! + + status.setStatus("Initialze ServerManager and schedule SCP for crash servers"); + this.serverManager = createServerManager(this); + createProcedureExecutor(); + // Create Assignment Manager + this.assignmentManager = new AssignmentManager(this); + this.assignmentManager.start(); + this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.serverManager); + this.regionServerTracker.start( + procedureExecutor.getProcedures().stream().filter(p -> p instanceof ServerCrashProcedure) + .map(p -> ((ServerCrashProcedure) p).getServerName()).collect(Collectors.toSet()), + walManager.getLiveServersFromWALDir()); + // This manager will be started AFTER hbase:meta is confirmed on line. // hbase.mirror.table.state.to.zookeeper is so hbase1 clients can connect. They read table // state from zookeeper while hbase2 reads it from hbase:meta. Disable if no hbase1 clients. this.tableStateManager = @@ -888,10 +913,37 @@ public class HMaster extends HRegionServer implements MasterServices { status.setStatus("Initializing master coprocessors"); this.cpHost = new MasterCoprocessorHost(this, this.conf); + status.setStatus("Initializing meta table if this is a new deploy"); + InitMetaProcedure initMetaProc = null; + if (assignmentManager.getRegionStates().getRegionState(RegionInfoBuilder.FIRST_META_REGIONINFO) + .isOffline()) { + Optional<Procedure<?>> optProc = procedureExecutor.getProcedures().stream() + .filter(p -> p instanceof InitMetaProcedure).findAny(); + if (optProc.isPresent()) { + initMetaProc = (InitMetaProcedure) optProc.get(); + } else { + // schedule an init meta procedure if meta has not been deployed yet + initMetaProc = new InitMetaProcedure(); + procedureExecutor.submitProcedure(initMetaProc); + } + } + if (this.balancer instanceof FavoredNodesPromoter) { + favoredNodesManager = new FavoredNodesManager(this); + } + + // initialize load balancer + this.balancer.setMasterServices(this); + this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor()); + this.balancer.initialize(); + // start up all service threads. status.setStatus("Initializing master service threads"); startServiceThreads(); - + // wait meta to be initialized after we start procedure executor + if (initMetaProc != null) { + initMetaProc.await(); + } + tableStateManager.start(); // Wake up this server to check in sleeper.skipSleepCycle(); @@ -903,28 +955,11 @@ public class HMaster extends HRegionServer implements MasterServices { LOG.info(Objects.toString(status)); waitForRegionServers(status); - if (this.balancer instanceof FavoredNodesPromoter) { - favoredNodesManager = new FavoredNodesManager(this); - } - - //initialize load balancer - this.balancer.setMasterServices(this); - this.balancer.setClusterMetrics(getClusterMetricsWithoutCoprocessor()); - this.balancer.initialize(); - - // Make sure meta assigned before proceeding. - status.setStatus("Recovering Meta Region"); - // Check if master is shutting down because issue initializing regionservers or balancer. if (isStopped()) { return; } - // Bring up hbase:meta. recoverMeta is a blocking call waiting until hbase:meta is deployed. - // It also starts the TableStateManager. - MasterMetaBootstrap metaBootstrap = createMetaBootstrap(); - metaBootstrap.recoverMeta(); - //Initialize after meta as it scans meta if (favoredNodesManager != null) { SnapshotOfRegionAssignmentFromMeta snapshotOfRegionAssignment = @@ -933,9 +968,6 @@ public class HMaster extends HRegionServer implements MasterServices { favoredNodesManager.initialize(snapshotOfRegionAssignment); } - status.setStatus("Submitting log splitting work for previously failed region servers"); - metaBootstrap.processDeadServers(); - // Fix up assignment manager status status.setStatus("Starting assignment manager"); this.assignmentManager.joinCluster(); @@ -977,6 +1009,7 @@ public class HMaster extends HRegionServer implements MasterServices { setInitialized(true); assignmentManager.checkIfShouldMoveSystemRegionAsync(); status.setStatus("Assign meta replicas"); + MasterMetaBootstrap metaBootstrap = createMetaBootstrap(); metaBootstrap.assignMetaReplicas(); status.setStatus("Starting quota manager"); initQuotaManager(); @@ -1119,7 +1152,6 @@ public class HMaster extends HRegionServer implements MasterServices { private void initQuotaManager() throws IOException { MasterQuotaManager quotaManager = new MasterQuotaManager(this); - this.assignmentManager.setRegionStateListener(quotaManager); quotaManager.start(); this.quotaManager = quotaManager; } @@ -1281,10 +1313,10 @@ public class HMaster extends HRegionServer implements MasterServices { } } - private void startProcedureExecutor() throws IOException { - final MasterProcedureEnv procEnv = new MasterProcedureEnv(this); - procedureStore = new WALProcedureStore(conf, - new MasterProcedureEnv.WALStoreLeaseRecovery(this)); + private void createProcedureExecutor() throws IOException { + MasterProcedureEnv procEnv = new MasterProcedureEnv(this); + procedureStore = + new WALProcedureStore(conf, new MasterProcedureEnv.WALStoreLeaseRecovery(this)); procedureStore.registerListener(new MasterProcedureEnv.MasterProcedureStoreListener(this)); MasterProcedureScheduler procedureScheduler = procEnv.getProcedureScheduler(); procedureExecutor = new ProcedureExecutor<>(conf, procEnv, procedureStore, procedureScheduler); @@ -1297,10 +1329,17 @@ public class HMaster extends HRegionServer implements MasterServices { conf.getBoolean(MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION, MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION); procedureStore.start(numThreads); - procedureExecutor.start(numThreads, abortOnCorruption); + // Just initialize it but do not start the workers, we will start the workers later by calling + // startProcedureExecutor. See the javadoc for finishActiveMasterInitialization for more + // details. + procedureExecutor.init(numThreads, abortOnCorruption); procEnv.getRemoteDispatcher().start(); } + private void startProcedureExecutor() throws IOException { + procedureExecutor.startWorkers(); + } + private void stopProcedureExecutor() { if (procedureExecutor != null) { configurationManager.deregisterObserver(procedureExecutor.getEnvironment()); @@ -2858,25 +2897,6 @@ public class HMaster extends HRegionServer implements MasterServices { } /** - * ServerCrashProcessingEnabled is set false before completing assignMeta to prevent processing - * of crashed servers. - * @return true if assignMeta has completed; - */ - @Override - public boolean isServerCrashProcessingEnabled() { - return serverCrashProcessingEnabled.isReady(); - } - - @VisibleForTesting - public void setServerCrashProcessingEnabled(final boolean b) { - procedureExecutor.getEnvironment().setEventReady(serverCrashProcessingEnabled, b); - } - - public ProcedureEvent<?> getServerCrashProcessingEnabledEvent() { - return serverCrashProcessingEnabled; - } - - /** * Compute the average load across all region servers. * Currently, this uses a very naive computation - just uses the number of * regions being served, ignoring stats about number of requests. @@ -3623,18 +3643,6 @@ public class HMaster extends HRegionServer implements MasterServices { return lockManager; } - @Override - public boolean recoverMeta() throws IOException { - // we need to block here so the latch should be greater than the current version to make sure - // that we will block. - ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(Integer.MAX_VALUE, 0); - procedureExecutor.submitProcedure(new RecoverMetaProcedure(null, true, latch)); - latch.await(); - LOG.info("hbase:meta deployed at={}", - getMetaTableLocator().getMetaRegionLocation(getZooKeeper())); - return assignmentManager.isMetaInitialized(); - } - public QuotaObserverChore getQuotaObserverChore() { return this.quotaObserverChore; } http://git-wip-us.apache.org/repos/asf/hbase/blob/6dbbd78a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java index dd46e41..ce21465 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMetaBootstrap.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master; import java.io.IOException; import java.util.List; -import java.util.Set; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; @@ -39,7 +38,7 @@ import org.slf4j.LoggerFactory; * Used by the HMaster on startup to split meta logs and assign the meta table. */ @InterfaceAudience.Private -public class MasterMetaBootstrap { +class MasterMetaBootstrap { private static final Logger LOG = LoggerFactory.getLogger(MasterMetaBootstrap.class); private final HMaster master; @@ -48,35 +47,12 @@ public class MasterMetaBootstrap { this.master = master; } - public void recoverMeta() throws InterruptedException, IOException { - // This is a blocking call that waits until hbase:meta is deployed. - master.recoverMeta(); - // Now we can start the TableStateManager. It is backed by hbase:meta. - master.getTableStateManager().start(); - // Enable server crash procedure handling - enableCrashedServerProcessing(); - } - - public void processDeadServers() { - // get a list for previously failed RS which need log splitting work - // we recover hbase:meta region servers inside master initialization and - // handle other failed servers in SSH in order to start up master node ASAP - Set<ServerName> previouslyFailedServers = - master.getMasterWalManager().getFailedServersFromLogFolders(); - - // Master has recovered hbase:meta region server and we put - // other failed region servers in a queue to be handled later by SSH - for (ServerName tmpServer : previouslyFailedServers) { - master.getServerManager().processDeadServer(tmpServer, true); - } - } - /** * For assigning hbase:meta replicas only. * TODO: The way this assign runs, nothing but chance to stop all replicas showing up on same * server as the hbase:meta region. */ - protected void assignMetaReplicas() + void assignMetaReplicas() throws IOException, InterruptedException, KeeperException { int numReplicas = master.getConfiguration().getInt(HConstants.META_REPLICAS_NUM, HConstants.DEFAULT_META_REPLICA_NUM); @@ -85,7 +61,7 @@ public class MasterMetaBootstrap { return; } final AssignmentManager assignmentManager = master.getAssignmentManager(); - if (!assignmentManager.isMetaInitialized()) { + if (!assignmentManager.isMetaLoaded()) { throw new IllegalStateException("hbase:meta must be initialized first before we can " + "assign out its replicas"); } @@ -137,15 +113,4 @@ public class MasterMetaBootstrap { LOG.warn("Ignoring exception " + ex); } } - - private void enableCrashedServerProcessing() throws InterruptedException { - // If crashed server processing is disabled, we enable it and expire those dead but not expired - // servers. This is required so that if meta is assigning to a server which dies after - // assignMeta starts assignment, ServerCrashProcedure can re-assign it. Otherwise, we will be - // stuck here waiting forever if waitForMeta is specified. - if (!master.isServerCrashProcessingEnabled()) { - master.setServerCrashProcessingEnabled(true); - master.getServerManager().processQueuedDeadServers(); - } - } } http://git-wip-us.apache.org/repos/asf/hbase/blob/6dbbd78a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index 3d2b9af..ac521d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -321,11 +321,6 @@ public interface MasterServices extends Server { TableDescriptors getTableDescriptors(); /** - * @return true if master enables ServerShutdownHandler; - */ - boolean isServerCrashProcessingEnabled(); - - /** * Registers a new protocol buffer {@link Service} subclass as a master coprocessor endpoint. * * <p> @@ -494,13 +489,6 @@ public interface MasterServices extends Server { */ public void checkIfShouldMoveSystemRegionAsync(); - /** - * Recover meta table. Will result in no-op is meta is already initialized. Any code that has - * access to master and requires to access meta during process initialization can call this - * method to make sure meta is initialized. - */ - boolean recoverMeta() throws IOException; - String getClientIdAuditPrefix(); /** http://git-wip-us.apache.org/repos/asf/hbase/blob/6dbbd78a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java index 4070ed3..2dc8918 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterWalManager.java @@ -26,7 +26,8 @@ import java.util.List; import java.util.Set; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; - +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -142,10 +143,34 @@ public class MasterWalManager { return this.fsOk; } + public Set<ServerName> getLiveServersFromWALDir() throws IOException { + Path walDirPath = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME); + FileStatus[] walDirForLiveServers = FSUtils.listStatus(fs, walDirPath, + p -> !p.getName().endsWith(AbstractFSWALProvider.SPLITTING_EXT)); + if (walDirForLiveServers == null) { + return Collections.emptySet(); + } + return Stream.of(walDirForLiveServers).map(s -> { + ServerName serverName = AbstractFSWALProvider.getServerNameFromWALDirectoryName(s.getPath()); + if (serverName == null) { + LOG.warn("Log folder {} doesn't look like its name includes a " + + "region server name; leaving in place. If you see later errors about missing " + + "write ahead logs they may be saved in this location.", s.getPath()); + return null; + } + return serverName; + }).filter(s -> s != null).collect(Collectors.toSet()); + } + /** * Inspect the log directory to find dead servers which need recovery work * @return A set of ServerNames which aren't running but still have WAL files left in file system + * @deprecated With proc-v2, we can record the crash server with procedure store, so do not need + * to scan the wal directory to find out the splitting wal directory any more. Leave + * it here only because {@code RecoverMetaProcedure}(which is also deprecated) uses + * it. */ + @Deprecated public Set<ServerName> getFailedServersFromLogFolders() { boolean retrySplitting = !conf.getBoolean("hbase.hlog.split.skip.errors", WALSplitter.SPLIT_SKIP_ERRORS_DEFAULT); @@ -240,6 +265,7 @@ public class MasterWalManager { boolean needReleaseLock = false; if (!this.services.isInitialized()) { // during master initialization, we could have multiple places splitting a same wal + // XXX: Does this still exist after we move to proc-v2? this.splitLogLock.lock(); needReleaseLock = true; } http://git-wip-us.apache.org/repos/asf/hbase/blob/6dbbd78a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java index 12c8e57..c599f78 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/RegionServerTracker.java @@ -46,19 +46,15 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo; /** - * <p> * Tracks the online region servers via ZK. - * </p> - * <p> + * <p/> * Handling of new RSs checking in is done via RPC. This class is only responsible for watching for * expired nodes. It handles listening for changes in the RS node list. The only exception is when * master restart, we will use the list fetched from zk to construct the initial set of live region * servers. - * </p> - * <p> + * <p/> * If an RS node gets deleted, this automatically handles calling of * {@link ServerManager#expireServer(ServerName)} - * </p> */ @InterfaceAudience.Private public class RegionServerTracker extends ZKListener { @@ -76,7 +72,7 @@ public class RegionServerTracker extends ZKListener { super(watcher); this.server = server; this.serverManager = serverManager; - executor = Executors.newSingleThreadExecutor( + this.executor = Executors.newSingleThreadExecutor( new ThreadFactoryBuilder().setDaemon(true).setNameFormat("RegionServerTracker-%d").build()); } @@ -109,14 +105,19 @@ public class RegionServerTracker extends ZKListener { } /** - * <p> - * Starts the tracking of online RegionServers. - * </p> - * <p> - * All RSs will be tracked after this method is called. - * </p> + * Starts the tracking of online RegionServers. All RSes will be tracked after this method is + * called. + * <p/> + * In this method, we will also construct the region server sets in {@link ServerManager}. If a + * region server is dead between the crash of the previous master instance and the start of the + * current master instance, we will schedule a SCP for it. This is done in + * {@link ServerManager#findOutDeadServersAndProcess(Set, Set)}, we call it here under the lock + * protection to prevent concurrency issues with server expiration operation. + * @param deadServersFromPE the region servers which already have SCP associated. + * @param liveServersFromWALDir the live region servers from wal directory. */ - public void start() throws KeeperException, IOException { + public void start(Set<ServerName> deadServersFromPE, Set<ServerName> liveServersFromWALDir) + throws KeeperException, IOException { watcher.registerListener(this); synchronized (this) { List<String> servers = @@ -132,6 +133,7 @@ public class RegionServerTracker extends ZKListener { : ServerMetricsBuilder.of(serverName); serverManager.checkAndRecordNewServer(serverName, serverMetrics); } + serverManager.findOutDeadServersAndProcess(deadServersFromPE, liveServersFromWALDir); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/6dbbd78a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index cfbd52f..201466e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -25,13 +25,11 @@ import java.net.InetAddress; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -180,41 +178,6 @@ public class ServerManager { private final RpcControllerFactory rpcControllerFactory; - /** - * Set of region servers which are dead but not processed immediately. If one - * server died before master enables ServerShutdownHandler, the server will be - * added to this set and will be processed through calling - * {@link ServerManager#processQueuedDeadServers()} by master. - * <p> - * A dead server is a server instance known to be dead, not listed in the /hbase/rs - * znode any more. It may have not been submitted to ServerShutdownHandler yet - * because the handler is not enabled. - * <p> - * A dead server, which has been submitted to ServerShutdownHandler while the - * handler is not enabled, is queued up. - * <p> - * So this is a set of region servers known to be dead but not submitted to - * ServerShutdownHandler for processing yet. - */ - private Set<ServerName> queuedDeadServers = new HashSet<>(); - - /** - * Set of region servers which are dead and submitted to ServerShutdownHandler to process but not - * fully processed immediately. - * <p> - * If one server died before assignment manager finished the failover cleanup, the server will be - * added to this set and will be processed through calling - * {@link ServerManager#processQueuedDeadServers()} by assignment manager. - * <p> - * The Boolean value indicates whether log split is needed inside ServerShutdownHandler - * <p> - * ServerShutdownHandler processes a dead server submitted to the handler after the handler is - * enabled. It may not be able to complete the processing because meta is not yet online or master - * is currently in startup mode. In this case, the dead server will be parked in this set - * temporarily. - */ - private Map<ServerName, Boolean> requeuedDeadServers = new ConcurrentHashMap<>(); - /** Listeners that are called on server events. */ private List<ServerListener> listeners = new CopyOnWriteArrayList<>(); @@ -378,6 +341,26 @@ public class ServerManager { } /** + * Find out the region servers crashed between the crash of the previous master instance and the + * current master instance and schedule SCP for them. + * <p/> + * Since the {@code RegionServerTracker} has already helped us to construct the online servers set + * by scanning zookeeper, now we can compare the online servers with {@code liveServersFromWALDir} + * to find out whether there are servers which are already dead. + * <p/> + * Must be called inside the initialization method of {@code RegionServerTracker} to avoid + * concurrency issue. + * @param deadServersFromPE the region servers which already have SCP associated. + * @param liveServersFromWALDir the live region servers from wal directory. + */ + void findOutDeadServersAndProcess(Set<ServerName> deadServersFromPE, + Set<ServerName> liveServersFromWALDir) { + deadServersFromPE.forEach(deadservers::add); + liveServersFromWALDir.stream().filter(sn -> !onlineServers.containsKey(sn)) + .forEach(this::expireServer); + } + + /** * Checks if the clock skew between the server and the master. If the clock skew exceeds the * configured max, it will throw an exception; if it exceeds the configured warning threshold, * it will log a warning but start normally. @@ -386,7 +369,7 @@ public class ServerManager { * @throws ClockOutOfSyncException if the skew exceeds the configured max value */ private void checkClockSkew(final ServerName serverName, final long serverCurrentTime) - throws ClockOutOfSyncException { + throws ClockOutOfSyncException { long skew = Math.abs(System.currentTimeMillis() - serverCurrentTime); if (skew > maxSkew) { String message = "Server " + serverName + " has been " + @@ -406,9 +389,7 @@ public class ServerManager { * If this server is on the dead list, reject it with a YouAreDeadException. * If it was dead but came back with a new start code, remove the old entry * from the dead list. - * @param serverName * @param what START or REPORT - * @throws org.apache.hadoop.hbase.YouAreDeadException */ private void checkIsDead(final ServerName serverName, final String what) throws YouAreDeadException { @@ -589,13 +570,12 @@ public class ServerManager { return ZKUtil.listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode); } - /* - * Expire the passed server. Add it to list of dead servers and queue a - * shutdown processing. - * @return True if we queued a ServerCrashProcedure else false if we did not (could happen - * for many reasons including the fact that its this server that is going down or we already - * have queued an SCP for this server or SCP processing is currently disabled because we are - * in startup phase). + /** + * Expire the passed server. Add it to list of dead servers and queue a shutdown processing. + * @return True if we queued a ServerCrashProcedure else false if we did not (could happen for + * many reasons including the fact that its this server that is going down or we already + * have queued an SCP for this server or SCP processing is currently disabled because we + * are in startup phase). */ public synchronized boolean expireServer(final ServerName serverName) { // THIS server is going down... can't handle our own expiration. @@ -605,18 +585,6 @@ public class ServerManager { } return false; } - // No SCP handling during startup. - if (!master.isServerCrashProcessingEnabled()) { - LOG.info("Master doesn't enable ServerShutdownHandler during initialization, " - + "delay expiring server " + serverName); - // Even though we delay expire of this server, we still need to handle Meta's RIT - // that are against the crashed server; since when we do RecoverMetaProcedure, - // the SCP is not enabled yet and Meta's RIT may be suspend forever. See HBase-19287 - master.getAssignmentManager().handleMetaRITOnCrashedServer(serverName); - this.queuedDeadServers.add(serverName); - // Return true because though on SCP queued, there will be one queued later. - return true; - } if (this.deadservers.isDeadServer(serverName)) { LOG.warn("Expiration called on {} but crash processing already in progress", serverName); return false; @@ -665,52 +633,6 @@ public class ServerManager { this.rsAdmins.remove(sn); } - public synchronized void processDeadServer(final ServerName serverName, boolean shouldSplitWal) { - // When assignment manager is cleaning up the zookeeper nodes and rebuilding the - // in-memory region states, region servers could be down. Meta table can and - // should be re-assigned, log splitting can be done too. However, it is better to - // wait till the cleanup is done before re-assigning user regions. - // - // We should not wait in the server shutdown handler thread since it can clog - // the handler threads and meta table could not be re-assigned in case - // the corresponding server is down. So we queue them up here instead. - if (!master.getAssignmentManager().isFailoverCleanupDone()) { - requeuedDeadServers.put(serverName, shouldSplitWal); - return; - } - - this.deadservers.add(serverName); - master.getAssignmentManager().submitServerCrash(serverName, shouldSplitWal); - } - - /** - * Process the servers which died during master's initialization. It will be - * called after HMaster#assignMeta and AssignmentManager#joinCluster. - * */ - synchronized void processQueuedDeadServers() { - if (!master.isServerCrashProcessingEnabled()) { - LOG.info("Master hasn't enabled ServerShutdownHandler"); - } - Iterator<ServerName> serverIterator = queuedDeadServers.iterator(); - while (serverIterator.hasNext()) { - ServerName tmpServerName = serverIterator.next(); - expireServer(tmpServerName); - serverIterator.remove(); - requeuedDeadServers.remove(tmpServerName); - } - - if (!master.getAssignmentManager().isFailoverCleanupDone()) { - if (LOG.isTraceEnabled()) { - LOG.trace("AssignmentManager failover cleanup not done."); - } - } - - for (Map.Entry<ServerName, Boolean> entry : requeuedDeadServers.entrySet()) { - processDeadServer(entry.getKey(), entry.getValue()); - } - requeuedDeadServers.clear(); - } - /* * Remove the server from the drain list. */ @@ -975,13 +897,6 @@ public class ServerManager { return new ArrayList<>(this.drainingServers); } - /** - * @return A copy of the internal set of deadNotExpired servers. - */ - Set<ServerName> getDeadNotExpiredServers() { - return new HashSet<>(this.queuedDeadServers); - } - public boolean isServerOnline(ServerName serverName) { return serverName != null && onlineServers.containsKey(serverName); } @@ -993,9 +908,7 @@ public class ServerManager { * master any more, for example, a very old previous instance). */ public synchronized boolean isServerDead(ServerName serverName) { - return serverName == null || deadservers.isDeadServer(serverName) - || queuedDeadServers.contains(serverName) - || requeuedDeadServers.containsKey(serverName); + return serverName == null || deadservers.isDeadServer(serverName); } public void shutdownCluster() { @@ -1061,8 +974,6 @@ public class ServerManager { final List<ServerName> drainingServersCopy = getDrainingServersList(); destServers.removeAll(drainingServersCopy); - // Remove the deadNotExpired servers from the server list. - removeDeadNotExpiredServers(destServers); return destServers; } @@ -1073,23 +984,6 @@ public class ServerManager { return createDestinationServersList(null); } - /** - * Loop through the deadNotExpired server list and remove them from the - * servers. - * This function should be used carefully outside of this class. You should use a high level - * method such as {@link #createDestinationServersList()} instead of managing you own list. - */ - void removeDeadNotExpiredServers(List<ServerName> servers) { - Set<ServerName> deadNotExpiredServersCopy = this.getDeadNotExpiredServers(); - if (!deadNotExpiredServersCopy.isEmpty()) { - for (ServerName server : deadNotExpiredServersCopy) { - LOG.debug("Removing dead but not expired server: " + server - + " from eligible server pool."); - servers.remove(server); - } - } - } - /** * To clear any dead server with same host name and port of any online server */ @@ -1259,7 +1153,6 @@ public class ServerManager { } } - private class FlushedSequenceIdFlusher extends ScheduledChore { public FlushedSequenceIdFlusher(String name, int p) {