Repository: hbase
Updated Branches:
  refs/heads/branch-2.0 6584a76d3 -> e8404c7c2


HBASE-21437 Bypassed procedure throw IllegalArgumentException when its state is 
WAITING_TIMEOUT

Signed-off-by: Allan Yang <allan...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e8404c7c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e8404c7c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e8404c7c

Branch: refs/heads/branch-2.0
Commit: e8404c7c21b5ceaddd1359037be1303171c97ae9
Parents: 6584a76
Author: jingyuntian <tianjy1...@gmail.com>
Authored: Fri Nov 9 22:45:43 2018 +0800
Committer: Allan Yang <allan...@apache.org>
Committed: Fri Nov 9 22:45:43 2018 +0800

----------------------------------------------------------------------
 .../hbase/procedure2/ProcedureExecutor.java     | 19 ++++++----
 .../hbase/procedure2/TimeoutExecutorThread.java |  2 +-
 .../hbase/procedure2/TestProcedureBypass.java   | 38 +++++++++++++++++++-
 3 files changed, 51 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e8404c7c/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 3bd5e0f..a70a9ef 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -1033,15 +1033,22 @@ public class ProcedureExecutor<TEnvironment> {
         store.update(procedure);
       }
 
-      // If we don't have the lock, we can't re-submit the queue,
-      // since it is already executing. To get rid of the stuck situation, we
-      // need to restart the master. With the procedure set to bypass, the 
procedureExecutor
-      // will bypass it and won't get stuck again.
-      if (lockEntry != null) {
-        // add the procedure to run queue,
+      // If state of procedure is WAITING_TIMEOUT, we can directly submit it 
to the scheduler.
+      // Instead we should remove it from timeout Executor queue and tranfer 
its state to RUNNABLE
+      if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
+        LOG.debug("transform procedure {} from WAITING_TIMEOUT to RUNNABLE", 
procedure);
+        if (timeoutExecutor.remove(procedure)) {
+          LOG.debug("removed procedure {} from timeoutExecutor", procedure);
+          timeoutExecutor.executeTimedoutProcedure(procedure);
+        }
+      } else if (lockEntry != null) {
         scheduler.addFront(procedure);
         LOG.info("Bypassing {} and its ancestors successfully, adding to 
queue", procedure);
       } else {
+        // If we don't have the lock, we can't re-submit the queue,
+        // since it is already executing. To get rid of the stuck situation, we
+        // need to restart the master. With the procedure set to bypass, the 
procedureExecutor
+        // will bypass it and won't get stuck again.
         LOG.info("Bypassing {} and its ancestors successfully, but since it is 
already running, "
             + "skipping add to queue", procedure);
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e8404c7c/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
index 9e050a2..4416177 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
@@ -126,7 +126,7 @@ class TimeoutExecutorThread<TEnvironment> extends 
StoppableThread {
     }
   }
 
-  private void executeTimedoutProcedure(Procedure<TEnvironment> proc) {
+  protected void executeTimedoutProcedure(Procedure<TEnvironment> proc) {
     // The procedure received a timeout. if the procedure itself does not 
handle it,
     // call abort() and add the procedure back in the queue for rollback.
     if (proc.setTimeoutFailure(executor.getEnvironment())) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/e8404c7c/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java
index fa40631..de7a0a1 100644
--- 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureBypass.java
@@ -38,6 +38,7 @@ import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
 
 
 @Category({MasterTests.class, SmallTests.class})
@@ -80,7 +81,7 @@ public class TestProcedureBypass {
         procStore);
     procStore.start(PROCEDURE_EXECUTOR_SLOTS);
     ProcedureTestingUtility
-        .initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, true);
+        .initAndStartWorkers(procExecutor, PROCEDURE_EXECUTOR_SLOTS, 0, false, 
true);
   }
 
   @Test
@@ -149,6 +150,18 @@ public class TestProcedureBypass {
     LOG.info("{} finished", proc);
   }
 
+  @Test
+  public void testBypassingWaitingTimeoutProcedures() throws Exception {
+    final WaitingTimeoutProcedure proc = new WaitingTimeoutProcedure();
+    long id = procExecutor.submitProcedure(proc);
+    Thread.sleep(500);
+    // bypass the procedure
+    assertTrue(procExecutor.bypassProcedure(id, 1000, true, false));
+
+    htu.waitFor(5000, () -> proc.isSuccess() && proc.isBypass());
+    LOG.info("{} finished", proc);
+  }
+
   @AfterClass
   public static void tearDown() throws Exception {
     procExecutor.stop();
@@ -188,6 +201,29 @@ public class TestProcedureBypass {
 
   }
 
+  public static class WaitingTimeoutProcedure
+      extends ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
+    public WaitingTimeoutProcedure() {
+      super();
+    }
+
+    @Override
+    protected Procedure[] execute(final TestProcEnv env)
+        throws ProcedureSuspendedException {
+      // Always suspend the procedure
+      setTimeout(50000);
+      setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
+      skipPersistence();
+      throw new ProcedureSuspendedException();
+    }
+
+    @Override
+    protected synchronized boolean setTimeoutFailure(TestProcEnv env) {
+      setState(ProcedureProtos.ProcedureState.RUNNABLE);
+      procExecutor.getScheduler().addFront(this);
+      return false; // 'false' means that this procedure handled the timeout
+    }
+  }
 
   public static class RootProcedure extends 
ProcedureTestingUtility.NoopProcedure<TestProcEnv> {
     private boolean childSpwaned = false;

Reply via email to