Repository: hbase Updated Branches: refs/heads/branch-2.0 6584a76d3 -> e8404c7c2
HBASE-21437 Bypassed procedure throw IllegalArgumentException when its state is WAITING_TIMEOUT Signed-off-by: Allan Yang <allan...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e8404c7c Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e8404c7c Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e8404c7c Branch: refs/heads/branch-2.0 Commit: e8404c7c21b5ceaddd1359037be1303171c97ae9 Parents: 6584a76 Author: jingyuntian <tianjy1...@gmail.com> Authored: Fri Nov 9 22:45:43 2018 +0800 Committer: Allan Yang <allan...@apache.org> Committed: Fri Nov 9 22:45:43 2018 +0800 ---------------------------------------------------------------------- .../hbase/procedure2/ProcedureExecutor.java | 19 ++++++---- .../hbase/procedure2/TimeoutExecutorThread.java | 2 +- .../hbase/procedure2/TestProcedureBypass.java | 38 +++++++++++++++++++- 3 files changed, 51 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/e8404c7c/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 3bd5e0f..a70a9ef 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -1033,15 +1033,22 @@ public class ProcedureExecutor<TEnvironment> { store.update(procedure); } - // If we don't have the lock, we can't re-submit the queue, - // since it is already executing. To get rid of the stuck situation, we - // need to restart the master. With the procedure set to bypass, the procedureExecutor - // will bypass it and won't get stuck again. - if (lockEntry != null) { - // add the procedure to run queue, + // If state of procedure is WAITING_TIMEOUT, we can directly submit it to the scheduler. + // Instead we should remove it from timeout Executor queue and tranfer its state to RUNNABLE + if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { + LOG.debug("transform procedure {} from WAITING_TIMEOUT to RUNNABLE", procedure); + if (timeoutExecutor.remove(procedure)) { + LOG.debug("removed procedure {} from timeoutExecutor", procedure); + timeoutExecutor.executeTimedoutProcedure(procedure); + } + } else if (lockEntry != null) { scheduler.addFront(procedure); LOG.info("Bypassing {} and its ancestors successfully, adding to queue", procedure); } else { + // If we don't have the lock, we can't re-submit the queue, + // since it is already executing. To get rid of the stuck situation, we + // need to restart the master. With the procedure set to bypass, the procedureExecutor + // will bypass it and won't get stuck again. LOG.info("Bypassing {} and its ancestors successfully, but since it is already running, " + "skipping add to queue", procedure); } http://git-wip-us.apache.org/repos/asf/hbase/blob/e8404c7c/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java index 9e050a2..4416177 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java @@ -126,7 +126,7 @@ class TimeoutExecutorThread<TEnvironment> extends StoppableThread { } } - private void executeTimedoutProcedure(Procedure<TEnvironment> proc) { + protected void executeTimedoutProcedure(Procedure<TEnvironment> proc) { // The procedure received a timeout. if the procedure itself does not handle it, // call abort() and add the procedure back in the queue for rollback. if (proc.setTimeoutFailure(executor.getEnvironment())) { http://git-wip-us.apache.org/repos/asf/hbase/blob/e8404c7c/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java index fa40631..de7a0a1 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java @@ -38,6 +38,7 @@ import org.junit.experimental.categories.Category; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos; @Category({MasterTests.class, SmallTests.class}) @@ -80,7 +81,7 @@ public class TestProcedureBypass { procStore); procStore.start(PROCEDURE_EXECUTOR_SLOTS); ProcedureTestingUtility - .initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true); + .initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, 0, false, true); } @Test @@ -149,6 +150,18 @@ public class TestProcedureBypass { LOG.info("{} finished", proc); } + @Test + public void testBypassingWaitingTimeoutProcedures() throws Exception { + final WaitingTimeoutProcedure proc = new WaitingTimeoutProcedure(); + long id = procExecutor.submitProcedure(proc); + Thread.sleep(500); + // bypass the procedure + assertTrue(procExecutor.bypassProcedure(id, 1000, true, false)); + + htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass()); + LOG.info("{} finished", proc); + } + @AfterClass public static void tearDown() throws Exception { procExecutor.stop(); @@ -188,6 +201,29 @@ public class TestProcedureBypass { } + public static class WaitingTimeoutProcedure + extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> { + public WaitingTimeoutProcedure() { + super(); + } + + @Override + protected Procedure[] execute(final TestProcEnv env) + throws ProcedureSuspendedException { + // Always suspend the procedure + setTimeout(50000); + setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT); + skipPersistence(); + throw new ProcedureSuspendedException(); + } + + @Override + protected synchronized boolean setTimeoutFailure(TestProcEnv env) { + setState(ProcedureProtos.ProcedureState.RUNNABLE); + procExecutor.getScheduler().addFront(this); + return false; // 'false' means that this procedure handled the timeout + } + } public static class RootProcedure extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> { private boolean childSpwaned = false;