HBASE-19978 The keepalive logic is incomplete in ProcedureExecutor

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/391790dd
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/391790dd
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/391790dd

Branch: refs/heads/HBASE-19064
Commit: 391790ddb05f0db0ffd67e0c2f2f961157af7be3
Parents: a9a6eed
Author: zhangduo <zhang...@apache.org>
Authored: Mon Feb 19 08:08:06 2018 +0800
Committer: Michael Stack <st...@apache.org>
Committed: Mon Feb 19 17:13:47 2018 -0800

----------------------------------------------------------------------
 .../hbase/procedure2/DelayedProcedure.java      |  28 ++
 .../hadoop/hbase/procedure2/InlineChore.java    |  42 +++
 .../hbase/procedure2/ProcedureExecutor.java     | 284 +++++--------------
 .../hbase/procedure2/StoppableThread.java       |  54 ++++
 .../hbase/procedure2/TimeoutExecutorThread.java | 140 +++++++++
 .../org/apache/hadoop/hbase/master/HMaster.java |  11 +-
 .../master/procedure/TestProcedurePriority.java | 180 ++++++++++++
 7 files changed, 522 insertions(+), 217 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/391790dd/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/DelayedProcedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/DelayedProcedure.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/DelayedProcedure.java
new file mode 100644
index 0000000..fcec0b7
--- /dev/null
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/DelayedProcedure.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2;
+
+import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+class DelayedProcedure extends 
DelayedUtil.DelayedContainerWithTimestamp<Procedure<?>> {
+  public DelayedProcedure(Procedure<?> procedure) {
+    super(procedure, procedure.getTimeoutTimestamp());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/391790dd/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/InlineChore.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/InlineChore.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/InlineChore.java
new file mode 100644
index 0000000..32b4922
--- /dev/null
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/InlineChore.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2;
+
+import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Inline Chores (executors internal chores).
+ */
+@InterfaceAudience.Private
+abstract class InlineChore extends DelayedUtil.DelayedObject implements 
Runnable {
+
+  private long timeout;
+
+  public abstract int getTimeoutInterval();
+
+  protected void refreshTimeout() {
+    this.timeout = EnvironmentEdgeManager.currentTime() + getTimeoutInterval();
+  }
+
+  @Override
+  public long getTimeout() {
+    return timeout;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/391790dd/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index b1ff426..af0a61b 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -18,9 +18,6 @@
 
 package org.apache.hadoop.hbase.procedure2;
 
-import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -30,35 +27,35 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.DelayQueue;
-import java.util.concurrent.TimeUnit;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
 import org.apache.hadoop.hbase.procedure2.Procedure.LockState;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
 import 
org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
-import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
-import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
-import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.NonceKey;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
 
 /**
  * Thread Pool that executes the submitted procedures.
@@ -83,7 +80,7 @@ public class ProcedureExecutor<TEnvironment> {
 
   public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY =
       "hbase.procedure.worker.keep.alive.time.msec";
-  private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = Long.MAX_VALUE;
+  private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = 
TimeUnit.MINUTES.toMillis(1);
 
   Testing testing = null;
   public static class Testing {
@@ -272,8 +269,9 @@ public class ProcedureExecutor<TEnvironment> {
   private CopyOnWriteArrayList<WorkerThread> workerThreads;
   private TimeoutExecutorThread timeoutExecutor;
   private int corePoolSize;
+  private int maxPoolSize;
 
-  private volatile long keepAliveTime = Long.MAX_VALUE;
+  private volatile long keepAliveTime;
 
   /**
    * Scheduler/Queue that contains runnable procedures.
@@ -501,7 +499,7 @@ public class ProcedureExecutor<TEnvironment> {
    *          a corrupted procedure is found on replay. otherwise false.
    */
   public void start(int numThreads, boolean abortOnCorruption) throws 
IOException {
-    if (running.getAndSet(true)) {
+    if (!running.compareAndSet(false, true)) {
       LOG.warn("Already running");
       return;
     }
@@ -509,13 +507,15 @@ public class ProcedureExecutor<TEnvironment> {
     // We have numThreads executor + one timer thread used for timing out
     // procedures and triggering periodic procedures.
     this.corePoolSize = numThreads;
-    LOG.info("Starting {} Workers (bigger of cpus/4 or 16)", corePoolSize);
+    this.maxPoolSize = 10 * numThreads;
+    LOG.info("Starting {} core workers (bigger of cpus/4 or 16) with max 
(burst) worker count={}",
+        corePoolSize, maxPoolSize);
 
     // Create the Thread Group for the executors
     threadGroup = new ThreadGroup("PEWorkerGroup");
 
     // Create the timeout executor
-    timeoutExecutor = new TimeoutExecutorThread(threadGroup);
+    timeoutExecutor = new TimeoutExecutorThread(this, threadGroup);
 
     // Create the workers
     workerId.set(0);
@@ -530,8 +530,8 @@ public class ProcedureExecutor<TEnvironment> {
     st = EnvironmentEdgeManager.currentTime();
     store.recoverLease();
     et = EnvironmentEdgeManager.currentTime();
-    LOG.info(String.format("Recovered %s lease in %s",
-      store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st)));
+    LOG.info("Recovered {} lease in {}", store.getClass().getSimpleName(),
+      StringUtils.humanTimeDiff(et - st));
 
     // start the procedure scheduler
     scheduler.start();
@@ -544,13 +544,11 @@ public class ProcedureExecutor<TEnvironment> {
     st = EnvironmentEdgeManager.currentTime();
     load(abortOnCorruption);
     et = EnvironmentEdgeManager.currentTime();
-    LOG.info(String.format("Loaded %s in %s, um pid=",
-      store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st)));
+    LOG.info("Loaded {} in {}", store.getClass().getSimpleName(),
+      StringUtils.humanTimeDiff(et - st));
 
     // Start the executors. Here we must have the lastProcId set.
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Start workers " + workerThreads.size());
-    }
+    LOG.trace("Start workers {}", workerThreads.size());
     timeoutExecutor.start();
     for (WorkerThread worker: workerThreads) {
       worker.start();
@@ -645,7 +643,7 @@ public class ProcedureExecutor<TEnvironment> {
     return this.store;
   }
 
-  protected ProcedureScheduler getScheduler() {
+  ProcedureScheduler getScheduler() {
     return scheduler;
   }
 
@@ -1152,7 +1150,7 @@ public class ProcedureExecutor<TEnvironment> {
     return procedures.keySet();
   }
 
-  private Long getRootProcedureId(Procedure proc) {
+  Long getRootProcedureId(Procedure proc) {
     return Procedure.getRootProcedureId(procedures, proc);
   }
 
@@ -1699,15 +1697,23 @@ public class ProcedureExecutor<TEnvironment> {
     sendProcedureFinishedNotification(proc.getProcId());
   }
 
+  RootProcedureState getProcStack(long rootProcId) {
+    return rollbackStack.get(rootProcId);
+  }
+
   // ==========================================================================
   //  Worker Thread
   // ==========================================================================
-  private final class WorkerThread extends StoppableThread {
+  private class WorkerThread extends StoppableThread {
     private final AtomicLong executionStartTime = new 
AtomicLong(Long.MAX_VALUE);
-    private Procedure activeProcedure;
+    private volatile Procedure<?> activeProcedure;
 
-    public WorkerThread(final ThreadGroup group) {
-      super(group, "PEWorker-" + workerId.incrementAndGet());
+    public WorkerThread(ThreadGroup group) {
+      this(group, "PEWorker-");
+    }
+
+    protected WorkerThread(ThreadGroup group, String prefix) {
+      super(group, prefix + workerId.incrementAndGet());
       setDaemon(true);
     }
 
@@ -1721,34 +1727,33 @@ public class ProcedureExecutor<TEnvironment> {
       long lastUpdate = EnvironmentEdgeManager.currentTime();
       try {
         while (isRunning() && keepAlive(lastUpdate)) {
-          this.activeProcedure = scheduler.poll(keepAliveTime, 
TimeUnit.MILLISECONDS);
-          if (this.activeProcedure == null) continue;
+          Procedure<?> proc = scheduler.poll(keepAliveTime, 
TimeUnit.MILLISECONDS);
+          if (proc == null) {
+            continue;
+          }
+          this.activeProcedure = proc;
           int activeCount = activeExecutorCount.incrementAndGet();
           int runningCount = store.setRunningProcedureCount(activeCount);
-          if (LOG.isTraceEnabled()) {
-            LOG.trace("Execute pid=" + this.activeProcedure.getProcId() +
-                " runningCount=" + runningCount + ", activeCount=" + 
activeCount);
-          }
+          LOG.trace("Execute pid={} runningCount={}, activeCount={}", 
proc.getProcId(),
+            runningCount, activeCount);
           executionStartTime.set(EnvironmentEdgeManager.currentTime());
           try {
-            executeProcedure(this.activeProcedure);
+            executeProcedure(proc);
           } catch (AssertionError e) {
-            LOG.info("ASSERT pid=" + this.activeProcedure.getProcId(), e);
+            LOG.info("ASSERT pid=" + proc.getProcId(), e);
             throw e;
           } finally {
             activeCount = activeExecutorCount.decrementAndGet();
             runningCount = store.setRunningProcedureCount(activeCount);
-            if (LOG.isTraceEnabled()) {
-              LOG.trace("Halt pid=" + this.activeProcedure.getProcId() +
-                  " runningCount=" + runningCount + ", activeCount=" + 
activeCount);
-            }
+            LOG.trace("Halt pid={} runningCount={}, activeCount={}", 
proc.getProcId(),
+              runningCount, activeCount);
             this.activeProcedure = null;
             lastUpdate = EnvironmentEdgeManager.currentTime();
             executionStartTime.set(Long.MAX_VALUE);
           }
         }
       } catch (Throwable t) {
-        LOG.warn("Worker terminating UNNATURALLY " + this.activeProcedure, t);
+        LOG.warn("Worker terminating UNNATURALLY {}", this.activeProcedure, t);
       } finally {
         LOG.trace("Worker terminated.");
       }
@@ -1768,169 +1773,23 @@ public class ProcedureExecutor<TEnvironment> {
       return EnvironmentEdgeManager.currentTime() - executionStartTime.get();
     }
 
-    private boolean keepAlive(final long lastUpdate) {
-      if (workerThreads.size() <= corePoolSize) return true;
-      return (EnvironmentEdgeManager.currentTime() - lastUpdate) < 
keepAliveTime;
+    // core worker never timeout
+    protected boolean keepAlive(long lastUpdate) {
+      return true;
     }
   }
 
-  /**
-   * Runs task on a period such as check for stuck workers.
-   * @see InlineChore
-   */
-  private final class TimeoutExecutorThread extends StoppableThread {
-    private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>();
+  // A worker thread which can be added when core workers are stuck. Will 
timeout after
+  // keepAliveTime if there is no procedure to run.
+  private final class KeepAliveWorkerThread extends WorkerThread {
 
-    public TimeoutExecutorThread(final ThreadGroup group) {
-      super(group, "ProcExecTimeout");
-      setDaemon(true);
+    public KeepAliveWorkerThread(ThreadGroup group) {
+      super(group, "KeepAlivePEWorker-");
     }
 
     @Override
-    public void sendStopSignal() {
-      queue.add(DelayedUtil.DELAYED_POISON);
-    }
-
-    @Override
-    public void run() {
-      final boolean traceEnabled = LOG.isTraceEnabled();
-      while (isRunning()) {
-        final DelayedWithTimeout task = 
DelayedUtil.takeWithoutInterrupt(queue);
-        if (task == null || task == DelayedUtil.DELAYED_POISON) {
-          // the executor may be shutting down,
-          // and the task is just the shutdown request
-          continue;
-        }
-
-        if (traceEnabled) {
-          LOG.trace("Executing " + task);
-        }
-
-        // execute the task
-        if (task instanceof InlineChore) {
-          execInlineChore((InlineChore)task);
-        } else if (task instanceof DelayedProcedure) {
-          execDelayedProcedure((DelayedProcedure)task);
-        } else {
-          LOG.error("CODE-BUG unknown timeout task type " + task);
-        }
-      }
-    }
-
-    public void add(final InlineChore chore) {
-      chore.refreshTimeout();
-      queue.add(chore);
-    }
-
-    public void add(final Procedure procedure) {
-      assert procedure.getState() == ProcedureState.WAITING_TIMEOUT;
-      LOG.info("ADDED " + procedure + "; timeout=" + procedure.getTimeout() +
-          ", timestamp=" + procedure.getTimeoutTimestamp());
-      queue.add(new DelayedProcedure(procedure));
-    }
-
-    public boolean remove(final Procedure procedure) {
-      return queue.remove(new DelayedProcedure(procedure));
-    }
-
-    private void execInlineChore(final InlineChore chore) {
-      chore.run();
-      add(chore);
-    }
-
-    private void execDelayedProcedure(final DelayedProcedure delayed) {
-      // TODO: treat this as a normal procedure, add it to the scheduler and
-      // let one of the workers handle it.
-      // Today we consider ProcedureInMemoryChore as InlineChores
-      final Procedure procedure = delayed.getObject();
-      if (procedure instanceof ProcedureInMemoryChore) {
-        executeInMemoryChore((ProcedureInMemoryChore)procedure);
-        // if the procedure is in a waiting state again, put it back in the 
queue
-        procedure.updateTimestamp();
-        if (procedure.isWaiting()) {
-          delayed.setTimeout(procedure.getTimeoutTimestamp());
-          queue.add(delayed);
-        }
-      } else {
-        executeTimedoutProcedure(procedure);
-      }
-    }
-
-    private void executeInMemoryChore(final ProcedureInMemoryChore chore) {
-      if (!chore.isWaiting()) return;
-
-      // The ProcedureInMemoryChore is a special case, and it acts as a chore.
-      // instead of bringing the Chore class in, we reuse this timeout thread 
for
-      // this special case.
-      try {
-        chore.periodicExecute(getEnvironment());
-      } catch (Throwable e) {
-        LOG.error("Ignoring " + chore + " exception: " + e.getMessage(), e);
-      }
-    }
-
-    private void executeTimedoutProcedure(final Procedure proc) {
-      // The procedure received a timeout. if the procedure itself does not 
handle it,
-      // call abort() and add the procedure back in the queue for rollback.
-      if (proc.setTimeoutFailure(getEnvironment())) {
-        long rootProcId = Procedure.getRootProcedureId(procedures, proc);
-        RootProcedureState procStack = rollbackStack.get(rootProcId);
-        procStack.abort();
-        store.update(proc);
-        scheduler.addFront(proc);
-      }
-    }
-  }
-
-  private static final class DelayedProcedure
-      extends DelayedUtil.DelayedContainerWithTimestamp<Procedure> {
-    public DelayedProcedure(final Procedure procedure) {
-      super(procedure, procedure.getTimeoutTimestamp());
-    }
-  }
-
-  private static abstract class StoppableThread extends Thread {
-    public StoppableThread(final ThreadGroup group, final String name) {
-      super(group, name);
-    }
-
-    public abstract void sendStopSignal();
-
-    public void awaitTermination() {
-      try {
-        final long startTime = EnvironmentEdgeManager.currentTime();
-        for (int i = 0; isAlive(); ++i) {
-          sendStopSignal();
-          join(250);
-          // Log every two seconds; send interrupt too.
-          if (i > 0 && (i % 8) == 0) {
-            LOG.warn("Waiting termination of thread " + getName() + ", " +
-              StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - 
startTime) +
-            "; sending interrupt");
-            interrupt();
-          }
-        }
-      } catch (InterruptedException e) {
-        LOG.warn(getName() + " join wait got interrupted", e);
-      }
-    }
-  }
-
-  // ==========================================================================
-  //  Inline Chores (executors internal chores)
-  // ==========================================================================
-  private static abstract class InlineChore extends DelayedUtil.DelayedObject 
implements Runnable {
-    private long timeout;
-
-    public abstract int getTimeoutInterval();
-
-    protected void refreshTimeout() {
-      this.timeout = EnvironmentEdgeManager.currentTime() + 
getTimeoutInterval();
-    }
-
-    @Override
-    public long getTimeout() {
-      return timeout;
+    protected boolean keepAlive(long lastUpdate) {
+      return EnvironmentEdgeManager.currentTime() - lastUpdate < keepAliveTime;
     }
   }
 
@@ -1975,32 +1834,35 @@ public class ProcedureExecutor<TEnvironment> {
     private int checkForStuckWorkers() {
       // check if any of the worker is stuck
       int stuckCount = 0;
-      for (WorkerThread worker: workerThreads) {
+      for (WorkerThread worker : workerThreads) {
         if (worker.getCurrentRunTime() < stuckThreshold) {
           continue;
         }
 
         // WARN the worker is stuck
         stuckCount++;
-        LOG.warn("Worker stuck " + worker +
-            " run time " + 
StringUtils.humanTimeDiff(worker.getCurrentRunTime()));
+        LOG.warn("Worker stuck {} run time {}", worker,
+          StringUtils.humanTimeDiff(worker.getCurrentRunTime()));
       }
       return stuckCount;
     }
 
     private void checkThreadCount(final int stuckCount) {
       // nothing to do if there are no runnable tasks
-      if (stuckCount < 1 || !scheduler.hasRunnables()) return;
+      if (stuckCount < 1 || !scheduler.hasRunnables()) {
+        return;
+      }
 
       // add a new thread if the worker stuck percentage exceed the threshold 
limit
       // and every handler is active.
-      final float stuckPerc = ((float)stuckCount) / workerThreads.size();
-      if (stuckPerc >= addWorkerStuckPercentage &&
-          activeExecutorCount.get() == workerThreads.size()) {
-        final WorkerThread worker = new WorkerThread(threadGroup);
+      final float stuckPerc = ((float) stuckCount) / workerThreads.size();
+      // let's add new worker thread more aggressively, as they will timeout 
finally if there is no
+      // work to do.
+      if (stuckPerc >= addWorkerStuckPercentage && workerThreads.size() < 
maxPoolSize) {
+        final KeepAliveWorkerThread worker = new 
KeepAliveWorkerThread(threadGroup);
         workerThreads.add(worker);
         worker.start();
-        LOG.debug("Added new worker thread " + worker);
+        LOG.debug("Added new worker thread {}", worker);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/391790dd/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StoppableThread.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StoppableThread.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StoppableThread.java
new file mode 100644
index 0000000..b58b571
--- /dev/null
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StoppableThread.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2;
+
+import org.apache.hadoop.hbase.procedure2.util.StringUtils;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+abstract class StoppableThread extends Thread {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(StoppableThread.class);
+
+  public StoppableThread(final ThreadGroup group, final String name) {
+    super(group, name);
+  }
+
+  public abstract void sendStopSignal();
+
+  public void awaitTermination() {
+    try {
+      final long startTime = EnvironmentEdgeManager.currentTime();
+      for (int i = 0; isAlive(); ++i) {
+        sendStopSignal();
+        join(250);
+        // Log every two seconds; send interrupt too.
+        if (i > 0 && (i % 8) == 0) {
+          LOG.warn("Waiting termination of thread {}, {}; sending interrupt", 
getName(),
+            StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - 
startTime));
+          interrupt();
+        }
+      }
+    } catch (InterruptedException e) {
+      LOG.warn("{} join wait got interrupted", getName(), e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/391790dd/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
new file mode 100644
index 0000000..e5e3230
--- /dev/null
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
@@ -0,0 +1,140 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2;
+
+import java.util.concurrent.DelayQueue;
+import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
+import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+
+/**
+ * Runs task on a period such as check for stuck workers.
+ * @see InlineChore
+ */
+@InterfaceAudience.Private
+class TimeoutExecutorThread extends StoppableThread {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TimeoutExecutorThread.class);
+
+  private final ProcedureExecutor<?> executor;
+
+  private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>();
+
+  public TimeoutExecutorThread(ProcedureExecutor<?> executor, ThreadGroup 
group) {
+    super(group, "ProcExecTimeout");
+    setDaemon(true);
+    this.executor = executor;
+  }
+
+  @Override
+  public void sendStopSignal() {
+    queue.add(DelayedUtil.DELAYED_POISON);
+  }
+
+  @Override
+  public void run() {
+    while (executor.isRunning()) {
+      final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
+      if (task == null || task == DelayedUtil.DELAYED_POISON) {
+        // the executor may be shutting down,
+        // and the task is just the shutdown request
+        continue;
+      }
+      LOG.trace("Executing {}", task);
+
+      // execute the task
+      if (task instanceof InlineChore) {
+        execInlineChore((InlineChore) task);
+      } else if (task instanceof DelayedProcedure) {
+        execDelayedProcedure((DelayedProcedure) task);
+      } else {
+        LOG.error("CODE-BUG unknown timeout task type {}", task);
+      }
+    }
+  }
+
+  public void add(InlineChore chore) {
+    chore.refreshTimeout();
+    queue.add(chore);
+  }
+
+  public void add(Procedure<?> procedure) {
+    assert procedure.getState() == ProcedureState.WAITING_TIMEOUT;
+    LOG.info("ADDED {}; timeout={}, timestamp={}", procedure, 
procedure.getTimeout(),
+      procedure.getTimeoutTimestamp());
+    queue.add(new DelayedProcedure(procedure));
+  }
+
+  public boolean remove(Procedure<?> procedure) {
+    return queue.remove(new DelayedProcedure(procedure));
+  }
+
+  private void execInlineChore(InlineChore chore) {
+    chore.run();
+    add(chore);
+  }
+
+  private void execDelayedProcedure(DelayedProcedure delayed) {
+    // TODO: treat this as a normal procedure, add it to the scheduler and
+    // let one of the workers handle it.
+    // Today we consider ProcedureInMemoryChore as InlineChores
+    Procedure<?> procedure = delayed.getObject();
+    if (procedure instanceof ProcedureInMemoryChore) {
+      executeInMemoryChore((ProcedureInMemoryChore) procedure);
+      // if the procedure is in a waiting state again, put it back in the queue
+      procedure.updateTimestamp();
+      if (procedure.isWaiting()) {
+        delayed.setTimeout(procedure.getTimeoutTimestamp());
+        queue.add(delayed);
+      }
+    } else {
+      executeTimedoutProcedure(procedure);
+    }
+  }
+
+  private void executeInMemoryChore(ProcedureInMemoryChore chore) {
+    if (!chore.isWaiting()) {
+      return;
+    }
+
+    // The ProcedureInMemoryChore is a special case, and it acts as a chore.
+    // instead of bringing the Chore class in, we reuse this timeout thread for
+    // this special case.
+    try {
+      chore.periodicExecute(executor.getEnvironment());
+    } catch (Throwable e) {
+      LOG.error("Ignoring {} exception: {}", chore, e.getMessage(), e);
+    }
+  }
+
+  private void executeTimedoutProcedure(Procedure proc) {
+    // The procedure received a timeout. if the procedure itself does not 
handle it,
+    // call abort() and add the procedure back in the queue for rollback.
+    if (proc.setTimeoutFailure(executor.getEnvironment())) {
+      long rootProcId = executor.getRootProcedureId(proc);
+      RootProcedureState procStack = executor.getProcStack(rootProcId);
+      procStack.abort();
+      executor.getStore().update(proc);
+      executor.getScheduler().addFront(proc);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/391790dd/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index b949aa9..b639503 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -1213,11 +1213,10 @@ public class HMaster extends HRegionServer implements 
MasterServices {
     configurationManager.registerObserver(procEnv);
 
     int cpus = Runtime.getRuntime().availableProcessors();
-    final int numThreads = 
conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
-        Math.max((cpus > 0? cpus/4: 0),
-            MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
-    final boolean abortOnCorruption = conf.getBoolean(
-        MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
+    final int numThreads = 
conf.getInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, Math.max(
+      (cpus > 0 ? cpus / 4 : 0), 
MasterProcedureConstants.DEFAULT_MIN_MASTER_PROCEDURE_THREADS));
+    final boolean abortOnCorruption =
+      conf.getBoolean(MasterProcedureConstants.EXECUTOR_ABORT_ON_CORRUPTION,
         MasterProcedureConstants.DEFAULT_EXECUTOR_ABORT_ON_CORRUPTION);
     procedureStore.start(numThreads);
     procedureExecutor.start(numThreads, abortOnCorruption);
@@ -3522,7 +3521,7 @@ public class HMaster extends HRegionServer implements 
MasterServices {
   public boolean recoverMeta() throws IOException {
     ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0);
     LOG.info("Running RecoverMetaProcedure to ensure proper hbase:meta 
deploy.");
-    long procId = procedureExecutor.submitProcedure(new 
RecoverMetaProcedure(null, true, latch));
+    procedureExecutor.submitProcedure(new RecoverMetaProcedure(null, true, 
latch));
     latch.await();
     LOG.info("hbase:meta deployed at=" +
         getMetaTableLocator().getMetaRegionLocation(getZooKeeper()));

http://git-wip-us.apache.org/repos/asf/hbase/blob/391790dd/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java
new file mode 100644
index 0000000..05d8976
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestProcedurePriority.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test to ensure that the priority for procedures and stuck checker can 
partially solve the problem
+ * describe in HBASE-19976, that is, RecoverMetaProcedure can finally be 
executed within a certain
+ * period of time.
+ */
+@Category({ MasterTests.class, LargeTests.class })
+public class TestProcedurePriority {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestProcedurePriority.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static String TABLE_NAME_PREFIX = "TestProcedurePriority-";
+
+  private static byte[] CF = Bytes.toBytes("cf");
+
+  private static byte[] CQ = Bytes.toBytes("cq");
+
+  private static int CORE_POOL_SIZE;
+
+  private static int TABLE_COUNT;
+
+  private static volatile boolean FAIL = false;
+
+  public static final class MyCP implements RegionObserver, RegionCoprocessor {
+
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
+
+    @Override
+    public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get 
get,
+        List<Cell> result) throws IOException {
+      if (FAIL && c.getEnvironment().getRegionInfo().isMetaRegion()) {
+        throw new IOException("Inject error");
+      }
+    }
+
+    @Override
+    public void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put 
put, WALEdit edit,
+        Durability durability) throws IOException {
+      if (FAIL && c.getEnvironment().getRegionInfo().isMetaRegion()) {
+        throw new IOException("Inject error");
+      }
+    }
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    
UTIL.getConfiguration().setLong(ProcedureExecutor.WORKER_KEEP_ALIVE_TIME_CONF_KEY,
 5000);
+    
UTIL.getConfiguration().setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS,
 4);
+    UTIL.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, 
MyCP.class.getName());
+    UTIL.startMiniCluster(3);
+    CORE_POOL_SIZE =
+      
UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor().getCorePoolSize();
+    TABLE_COUNT = 50 * CORE_POOL_SIZE;
+    List<Future<?>> futures = new ArrayList<>();
+    for (int i = 0; i < TABLE_COUNT; i++) {
+      futures.add(UTIL.getAdmin().createTableAsync(
+        TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE_NAME_PREFIX 
+ i))
+          .addColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)).build(),
+        null));
+    }
+    for (Future<?> future : futures) {
+      future.get(1, TimeUnit.MINUTES);
+    }
+    UTIL.getAdmin().balance(true);
+    UTIL.waitUntilNoRegionsInTransition();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void test() throws Exception {
+    RegionServerThread rsWithMetaThread = 
UTIL.getMiniHBaseCluster().getRegionServerThreads()
+      .stream().filter(t -> 
!t.getRegionServer().getRegions(TableName.META_TABLE_NAME).isEmpty())
+      .findAny().get();
+    HRegionServer rsNoMeta = 
UTIL.getOtherRegionServer(rsWithMetaThread.getRegionServer());
+    FAIL = true;
+    UTIL.getMiniHBaseCluster().killRegionServer(rsNoMeta.getServerName());
+    // wait until all the worker thread are stuck, which means that the stuck 
checker will start to
+    // add new worker thread.
+    ProcedureExecutor<?> executor =
+      UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
+    UTIL.waitFor(60000, new ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        return executor.getWorkerThreadCount() > CORE_POOL_SIZE;
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return "Stuck checker does not add new worker thread";
+      }
+    });
+    
UTIL.getMiniHBaseCluster().killRegionServer(rsWithMetaThread.getRegionServer().getServerName());
+    rsWithMetaThread.join();
+    FAIL = false;
+    // verify that the cluster is back
+    UTIL.waitUntilNoRegionsInTransition(60000);
+    for (int i = 0; i < TABLE_COUNT; i++) {
+      try (Table table = 
UTIL.getConnection().getTable(TableName.valueOf(TABLE_NAME_PREFIX + i))) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, 
Bytes.toBytes(i)));
+      }
+    }
+    UTIL.waitFor(30000, new ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        return executor.getWorkerThreadCount() == CORE_POOL_SIZE;
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return "The new workers do not timeout";
+      }
+    });
+  }
+}

Reply via email to