Repository: hbase Updated Branches: refs/heads/branch-2.0 5834a4f90 -> 6b8cfd276
HBASE-21423 Procedures for meta table/region should be able to execute in separate workers Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6b8cfd27 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6b8cfd27 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6b8cfd27 Branch: refs/heads/branch-2.0 Commit: 6b8cfd276fc1e33c691f93575ed9ff2df06c08e2 Parents: 5834a4f Author: Allan Yang <allan...@apache.org> Authored: Mon Nov 5 20:23:19 2018 +0800 Committer: Allan Yang <allan...@apache.org> Committed: Mon Nov 5 20:23:19 2018 +0800 ---------------------------------------------------------------------- .../procedure2/AbstractProcedureScheduler.java | 29 ++- .../hbase/procedure2/ProcedureExecutor.java | 64 ++++++- .../hbase/procedure2/ProcedureScheduler.java | 17 ++ .../procedure2/SimpleProcedureScheduler.java | 2 +- .../procedure2/ProcedureTestingUtility.java | 10 +- .../hbase/procedure2/TestChildProcedures.java | 2 +- .../hbase/procedure2/TestProcedureExecutor.java | 2 +- .../procedure2/TestProcedureSuspended.java | 3 +- .../hbase/procedure2/TestYieldProcedures.java | 4 +- .../org/apache/hadoop/hbase/master/HMaster.java | 5 +- .../procedure/MasterProcedureConstants.java | 7 + .../procedure/MasterProcedureScheduler.java | 8 +- .../TestSplitTableRegionProcedure.java | 2 + .../master/procedure/TestProcedurePriority.java | 1 + .../procedure/TestServerCrashProcedure.java | 2 + .../procedure/TestTableDDLProcedureBase.java | 2 + .../procedure/TestUrgentProcedureWorker.java | 188 +++++++++++++++++++ 17 files changed, 327 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java index 7ab1329..b2a2e5a 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java @@ -139,20 +139,39 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { * NOTE: this method is called with the sched lock held. * @return the Procedure to execute, or null if nothing is available. */ - protected abstract Procedure dequeue(); + protected Procedure dequeue() { + return dequeue(false); + } + + protected abstract Procedure dequeue(boolean onlyUrgent); + + + @Override + public Procedure poll(boolean onlyUrgent) { + return poll(onlyUrgent, -1); + } @Override public Procedure poll() { - return poll(-1); + return poll(false, -1); + } + + @Override + public Procedure poll(boolean onlyUrgent, long timeout, TimeUnit unit) { + return poll(onlyUrgent, unit.toNanos(timeout)); } @Override public Procedure poll(long timeout, TimeUnit unit) { - return poll(unit.toNanos(timeout)); + return poll(false, unit.toNanos(timeout)); } - @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") public Procedure poll(final long nanos) { + return poll(false, nanos); + } + + @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") + public Procedure poll(final boolean onlyUrgent, final long nanos) { schedLock(); try { if (!running) { @@ -173,7 +192,7 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { return null; } } - final Procedure pollResult = dequeue(); + final Procedure pollResult = dequeue(onlyUrgent); pollCalls++; nullPollCalls += (pollResult == null) ? 1 : 0; http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index b1f3de3..3bd5e0f 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -209,6 +209,11 @@ public class ProcedureExecutor<TEnvironment> { private CopyOnWriteArrayList<WorkerThread> workerThreads; /** + * Worker thread only for urgent tasks. + */ + private List<WorkerThread> urgentWorkerThreads; + + /** * Created in the {@link #init(int, boolean)} method. Terminated in {@link #join()} (FIX! Doing * resource handling rather than observing in a #join is unexpected). * Overridden when we do the ProcedureTestingUtility.testRecoveryAndDoubleExecution trickery @@ -218,6 +223,7 @@ public class ProcedureExecutor<TEnvironment> { private int corePoolSize; private int maxPoolSize; + private int urgentPoolSize; private volatile long keepAliveTime; @@ -558,12 +564,30 @@ public class ProcedureExecutor<TEnvironment> { * is found on replay. otherwise false. */ public void init(int numThreads, boolean abortOnCorruption) throws IOException { + init(numThreads, 1, abortOnCorruption); + } + + /** + * Initialize the procedure executor, but do not start workers. We will start them later. + * <p/> + * It calls ProcedureStore.recoverLease() and ProcedureStore.load() to recover the lease, and + * ensure a single executor, and start the procedure replay to resume and recover the previous + * pending and in-progress procedures. + * @param numThreads number of threads available for procedure execution. + * @param urgentNumThreads number of threads available for urgent procedure execution. + * @param abortOnCorruption true if you want to abort your service in case a corrupted procedure + * is found on replay. otherwise false. + */ + public void init(int numThreads, int urgentNumThreads, + boolean abortOnCorruption) throws IOException { // We have numThreads executor + one timer thread used for timing out // procedures and triggering periodic procedures. this.corePoolSize = numThreads; this.maxPoolSize = 10 * numThreads; - LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker count={}", - corePoolSize, maxPoolSize); + this.urgentPoolSize = urgentNumThreads; + LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max (burst) worker " + + "count={}, start {} urgent thread(s)", + corePoolSize, maxPoolSize, urgentPoolSize); this.threadGroup = new ThreadGroup("PEWorkerGroup"); this.timeoutExecutor = new TimeoutExecutorThread<>(this, threadGroup); @@ -571,9 +595,14 @@ public class ProcedureExecutor<TEnvironment> { // Create the workers workerId.set(0); workerThreads = new CopyOnWriteArrayList<>(); + urgentWorkerThreads = new ArrayList<>(); for (int i = 0; i < corePoolSize; ++i) { workerThreads.add(new WorkerThread(threadGroup)); } + for (int i = 0; i < urgentNumThreads; ++i) { + urgentWorkerThreads + .add(new WorkerThread(threadGroup, "UrgentPEWorker-", true)); + } long st, et; @@ -608,12 +637,17 @@ public class ProcedureExecutor<TEnvironment> { return; } // Start the executors. Here we must have the lastProcId set. - LOG.trace("Start workers {}", workerThreads.size()); + LOG.debug("Start workers {}, urgent workers", workerThreads.size(), + urgentWorkerThreads.size()); timeoutExecutor.start(); for (WorkerThread worker: workerThreads) { worker.start(); } + for (WorkerThread worker: urgentWorkerThreads) { + worker.start(); + } + // Internal chores timeoutExecutor.add(new WorkerMonitor()); @@ -663,6 +697,11 @@ public class ProcedureExecutor<TEnvironment> { worker.awaitTermination(); } + // stop the worker threads + for (WorkerThread worker: urgentWorkerThreads) { + worker.awaitTermination(); + } + // Destroy the Thread Group for the executors // TODO: Fix. #join is not place to destroy resources. try { @@ -700,7 +739,7 @@ public class ProcedureExecutor<TEnvironment> { * @return the current number of worker threads. */ public int getWorkerThreadCount() { - return workerThreads.size(); + return workerThreads.size() + urgentWorkerThreads.size(); } /** @@ -710,6 +749,10 @@ public class ProcedureExecutor<TEnvironment> { return corePoolSize; } + public int getUrgentPoolSize() { + return urgentPoolSize; + } + public int getActiveExecutorCount() { return activeExecutorCount.get(); } @@ -1949,13 +1992,18 @@ public class ProcedureExecutor<TEnvironment> { private class WorkerThread extends StoppableThread { private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE); private volatile Procedure<TEnvironment> activeProcedure; - + private boolean onlyPollUrgent = false; public WorkerThread(ThreadGroup group) { this(group, "PEWorker-"); } protected WorkerThread(ThreadGroup group, String prefix) { + this(group, prefix, false); + } + + protected WorkerThread(ThreadGroup group, String prefix, boolean onlyPollUrgent) { super(group, prefix + workerId.incrementAndGet()); + this.onlyPollUrgent = onlyPollUrgent; setDaemon(true); } @@ -2000,7 +2048,11 @@ public class ProcedureExecutor<TEnvironment> { } finally { LOG.trace("Worker terminated."); } - workerThreads.remove(this); + if (onlyPollUrgent) { + urgentWorkerThreads.remove(this); + } else { + workerThreads.remove(this); + } } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java index 9489f52..2d16849 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java @@ -104,6 +104,13 @@ public interface ProcedureScheduler { /** * Fetch one Procedure from the queue + * @param onlyUrgent Only poll the urgent procedure to execute + * @return a Procedure + */ + Procedure poll(boolean onlyUrgent); + + /** + * Fetch one Procedure from the queue * @param timeout how long to wait before giving up, in units of unit * @param unit a TimeUnit determining how to interpret the timeout parameter * @return the Procedure to execute, or null if nothing present. @@ -111,6 +118,16 @@ public interface ProcedureScheduler { Procedure poll(long timeout, TimeUnit unit); /** + * Fetch one Procedure from the queue + * @param onlyUrgent Only poll the urgent procedure to execute + * @param timeout how long to wait before giving up, in units of unit + * @param unit a TimeUnit determining how to interpret the timeout parameter + * @return a Procedure + */ + Procedure poll(boolean onlyUrgent, long timeout, TimeUnit unit); + + + /** * List lock queues. * @return the locks */ http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java index feab8be..3df0e20 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SimpleProcedureScheduler.java @@ -43,7 +43,7 @@ public class SimpleProcedureScheduler extends AbstractProcedureScheduler { } @Override - protected Procedure dequeue() { + protected Procedure dequeue(boolean onlyUrgent) { return runnables.poll(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index e82fc7d..1709f63 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -87,7 +87,12 @@ public class ProcedureTestingUtility { public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads, boolean abortOnCorruption, boolean startWorkers) throws IOException { - procExecutor.init(numThreads, abortOnCorruption); + initAndStartWorkers(procExecutor, numThreads, 1, abortOnCorruption, startWorkers); + } + + public static void initAndStartWorkers(ProcedureExecutor<?> procExecutor, int numThreads, + int numUrgentThreads, boolean abortOnCorruption, boolean startWorkers) throws IOException { + procExecutor.init(numThreads, numUrgentThreads, abortOnCorruption); if (startWorkers) { procExecutor.startWorkers(); } @@ -109,6 +114,7 @@ public class ProcedureTestingUtility { final ProcedureStore procStore = procExecutor.getStore(); final int storeThreads = procExecutor.getCorePoolSize(); final int execThreads = procExecutor.getCorePoolSize(); + final int urgentThreads = procExecutor.getUrgentPoolSize(); final ProcedureExecutor.Testing testing = procExecutor.testing; if (avoidTestKillDuringRestart) { @@ -130,7 +136,7 @@ public class ProcedureTestingUtility { // re-start LOG.info("RESTART - Start"); procStore.start(storeThreads); - initAndStartWorkers(procExecutor, execThreads, failOnCorrupted, startWorkers); + initAndStartWorkers(procExecutor, execThreads, urgentThreads, failOnCorrupted, startWorkers); if (startAction != null) { startAction.call(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java index 4f3c443..b837e82 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestChildProcedures.java @@ -69,7 +69,7 @@ public class TestChildProcedures { procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), procEnv, procStore); procExecutor.testing = new ProcedureExecutor.Testing(); procStore.start(PROCEDURE_EXECUTOR_SLOTS); - ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, 0, false, true); } @After http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java index 7f130ca..2fe19f3 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java @@ -71,7 +71,7 @@ public class TestProcedureExecutor { private void createNewExecutor(final Configuration conf, final int numThreads) throws Exception { procExecutor = new ProcedureExecutor<>(conf, procEnv, procStore); - ProcedureTestingUtility.initAndStartWorkers(procExecutor, numThreads, true); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, numThreads, 0, false, true); } @Test http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java index c1c9187..ec9d27f 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureSuspended.java @@ -62,7 +62,8 @@ public class TestProcedureSuspended { procStore = new NoopProcedureStore(); procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore); procStore.start(PROCEDURE_EXECUTOR_SLOTS); - ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); + ProcedureTestingUtility + .initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, 0, false, true); } @After http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java index b5137b0..8a2e296 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestYieldProcedures.java @@ -74,7 +74,7 @@ public class TestYieldProcedures { procExecutor = new ProcedureExecutor<>(htu.getConfiguration(), new TestProcEnv(), procStore, procRunnables); procStore.start(PROCEDURE_EXECUTOR_SLOTS); - ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); + ProcedureTestingUtility.initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, 0, false, true); } @After @@ -379,6 +379,7 @@ public class TestYieldProcedures { @Override public Procedure poll() { + LOG.error("polled()"); pollCalls++; return super.poll(); } @@ -386,6 +387,7 @@ public class TestYieldProcedures { @Override public Procedure poll(long timeout, TimeUnit unit) { pollCalls++; + LOG.error("polled(long timeout, TimeUnit unit)"); return super.poll(timeout, unit); } http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 5bab8cc..288b33f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -1396,6 +1396,9 @@ public class HMaster extends HRegionServer implements MasterServices { int cpus = Runtime.getRuntime().availableProcessors(); final int numThreads = conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, Math.max( (cpus > 0 ? cpus / 4 : 0), MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS)); + final int urgentWorkers = conf + .getInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, + MasterProcedureConstants.DEFAULT_MASTER_URGENT_PROCEDURE_THREADS); final boolean abortOnCorruption = conf.getBoolean(MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION, MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION); @@ -1403,7 +1406,7 @@ public class HMaster extends HRegionServer implements MasterServices { // Just initialize it but do not start the workers, we will start the workers later by calling // startProcedureExecutor. See the javadoc for finishActiveMasterInitialization for more // details. - procedureExecutor.init(numThreads, abortOnCorruption); + procedureExecutor.init(numThreads, urgentWorkers, abortOnCorruption); procEnv.getRemoteDispatcher().start(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java index 495fab6..728ad43 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureConstants.java @@ -28,6 +28,13 @@ public final class MasterProcedureConstants { public static final String MASTER_PROCEDURE_THREADS = "hbase.master.procedure.threads"; public static final int DEFAULT_MIN_MASTER_PROCEDURE_THREADS = 16; + /** Number of threads used by the procedure executor for urgent proceudres + * For now, only meta table procedure is urgent + */ + public static final String MASTER_URGENT_PROCEDURE_THREADS = + "hbase.master.urgent.procedure.threads"; + public static final int DEFAULT_MASTER_URGENT_PROCEDURE_THREADS = 1; + /** * Procedure replay sanity check. In case a WAL is missing or unreadable we * may lose information about pending/running procedures. http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java index 7bab7b3..b060763 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/MasterProcedureScheduler.java @@ -124,7 +124,8 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { @Override protected void enqueue(final Procedure proc, final boolean addFront) { - if (isMetaProcedure(proc)) { + if (isMetaProcedure(proc) || + (isTableProcedure(proc) && getTableName(proc).equals(TableName.META_TABLE_NAME))) { doAdd(metaRunQueue, getMetaQueue(), proc, addFront); } else if (isTableProcedure(proc)) { doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront); @@ -162,9 +163,12 @@ public class MasterProcedureScheduler extends AbstractProcedureScheduler { } @Override - protected Procedure dequeue() { + protected Procedure dequeue(boolean onlyUrgent) { // meta procedure is always the first priority Procedure<?> pollResult = doPoll(metaRunQueue); + if (onlyUrgent) { + return pollResult; + } // For now, let server handling have precedence over table handling; presumption is that it // is more important handling crashed servers than it is running the // enabling/disabling tables, etc. http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestSplitTableRegionProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestSplitTableRegionProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestSplitTableRegionProcedure.java index 1922848..7e37e44 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestSplitTableRegionProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestSplitTableRegionProcedure.java @@ -97,6 +97,8 @@ public class TestSplitTableRegionProcedure { private static void setupConf(Configuration conf) { conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1); + //testRecoveryAndDoubleExecution requires only one worker + conf.setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0); conf.setLong(HConstants.MAJOR_COMPACTION_PERIOD, 0); } http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java index 32fb173..9008dcc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java @@ -106,6 +106,7 @@ public class TestProcedurePriority { public static void setUp() throws Exception { UTIL.getConfiguration().setLong(ProcedureExecutor.WORKER_KEEP_ALIVE_TIME_CONF_KEY, 5000); UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 4); + UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0); UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, MyCP.class.getName()); UTIL.startMiniCluster(3); CORE_POOL_SIZE = http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java index 9f7fafe..cef60f3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestServerCrashProcedure.java @@ -60,6 +60,8 @@ public class TestServerCrashProcedure { private void setupConf(Configuration conf) { conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1); + //testxxxDoubleExecution requires only one worker + conf.setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0); conf.set("hbase.balancer.tablesOnMaster", "none"); conf.setInt("hbase.client.retries.number", 3); } http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestTableDDLProcedureBase.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestTableDDLProcedureBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestTableDDLProcedureBase.java index f7cf640..9680627 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestTableDDLProcedureBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestTableDDLProcedureBase.java @@ -39,6 +39,8 @@ public abstract class TestTableDDLProcedureBase { private static void setupConf(Configuration conf) { conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1); + //testxxxDoubleExecution requires only one worker + conf.setInt(MasterProcedureConstants.MASTER_URGENT_PROCEDURE_THREADS, 0); } @BeforeClass http://git-wip-us.apache.org/repos/asf/hbase/blob/6b8cfd27/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestUrgentProcedureWorker.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestUrgentProcedureWorker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestUrgentProcedureWorker.java new file mode 100644 index 0000000..c7801e4 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/TestUrgentProcedureWorker.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.procedure; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; +import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface; +import org.apache.hadoop.hbase.procedure2.Procedure; +import org.apache.hadoop.hbase.procedure2.ProcedureExecutor; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.ProcedureYieldException; +import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +@Category({MasterTests.class, SmallTests.class}) +public class TestUrgentProcedureWorker { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestUrgentProcedureWorker.class); + + private static final Logger LOG = LoggerFactory + .getLogger(TestUrgentProcedureWorker.class); + private static final int PROCEDURE_EXECUTOR_SLOTS = 1; + private static final CountDownLatch metaFinished = new CountDownLatch(1); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static final TableName TABLE_NAME = TableName.valueOf("TestUrgentProcedureWorker"); + + private static WALProcedureStore procStore; + + private static ProcedureExecutor<TestEnv> procExec; + + private static final class TestEnv { + private final MasterProcedureScheduler scheduler; + + public TestEnv(MasterProcedureScheduler scheduler) { + this.scheduler = scheduler; + } + + public MasterProcedureScheduler getScheduler() { + return scheduler; + } + } + + public static class WaitingMetaProcedure extends ProcedureTestingUtility.NoopProcedure<TestEnv> + implements TableProcedureInterface { + + + @Override + protected Procedure<TestEnv>[] execute(TestEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, + InterruptedException { + metaFinished.await(); + return null; + } + + @Override + protected Procedure.LockState acquireLock(TestEnv env) { + if (env.getScheduler().waitTableExclusiveLock(this, getTableName())) { + return LockState.LOCK_EVENT_WAIT; + } + return LockState.LOCK_ACQUIRED; + } + + @Override + protected void releaseLock(TestEnv env) { + env.getScheduler().wakeTableExclusiveLock(this, getTableName()); + } + + @Override + protected boolean holdLock(TestEnv env) { + return true; + } + + @Override + public TableName getTableName() { + return TABLE_NAME; + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.EDIT; + } + } + + public static class MetaProcedure extends ProcedureTestingUtility.NoopProcedure<TestEnv> + implements TableProcedureInterface { + + + @Override + protected Procedure<TestEnv>[] execute(TestEnv env) + throws ProcedureYieldException, ProcedureSuspendedException, + InterruptedException { + metaFinished.countDown(); + return null; + } + + @Override + protected Procedure.LockState acquireLock(TestEnv env) { + if (env.getScheduler().waitTableExclusiveLock(this, getTableName())) { + return LockState.LOCK_EVENT_WAIT; + } + return LockState.LOCK_ACQUIRED; + } + + @Override + protected void releaseLock(TestEnv env) { + env.getScheduler().wakeTableExclusiveLock(this, getTableName()); + } + + @Override + protected boolean holdLock(TestEnv env) { + return true; + } + + @Override + public TableName getTableName() { + return TableName.META_TABLE_NAME; + } + + @Override + public TableOperationType getTableOperationType() { + return TableOperationType.EDIT; + } + } + + @AfterClass + public static void tearDownAfterClass() throws IOException { + UTIL.cleanupTestDir(); + } + + @BeforeClass + public static void setUp() throws IOException { + UTIL.getConfiguration().setInt("hbase.procedure.worker.stuck.threshold.msec", 6000000); + procStore = ProcedureTestingUtility.createWalStore(UTIL.getConfiguration(), + UTIL.getDataTestDir("TestUrgentProcedureWorker")); + procStore.start(1); + MasterProcedureScheduler scheduler = new MasterProcedureScheduler(pid -> null); + procExec = new ProcedureExecutor<>(UTIL.getConfiguration(), new TestEnv(scheduler), procStore, + scheduler); + procExec.init(1, false); + procExec.startWorkers(); + } + + @Test + public void test() throws Exception { + WaitingMetaProcedure waitingMetaProcedure = new WaitingMetaProcedure(); + long waitProc = procExec.submitProcedure(waitingMetaProcedure); + MetaProcedure metaProcedure = new MetaProcedure(); + long metaProc = procExec.submitProcedure(metaProcedure); + UTIL.waitFor(5000, () -> procExec.isFinished(waitProc)); + + } + + + +}