HBASE-16846 Procedure v2 - executor cleanup

Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c6e9dabe
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c6e9dabe
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c6e9dabe

Branch: refs/heads/master
Commit: c6e9dabe625b910b3b140d9234710f6e8b968b37
Parents: c8e9a29
Author: Matteo Bertozzi <matteo.berto...@cloudera.com>
Authored: Mon Oct 17 10:23:33 2016 -0700
Committer: Matteo Bertozzi <matteo.berto...@cloudera.com>
Committed: Mon Oct 17 10:30:59 2016 -0700

----------------------------------------------------------------------
 .../procedure2/AbstractProcedureScheduler.java  |  24 +-
 .../hadoop/hbase/procedure2/Procedure.java      | 449 +++++----
 .../hbase/procedure2/ProcedureExecutor.java     | 996 ++++++++++++-------
 .../hbase/procedure2/util/DelayedUtil.java      | 150 +++
 .../procedure2/ProcedureTestingUtility.java     |   4 +-
 .../hbase/procedure2/TestProcedureExecutor.java | 171 ++++
 .../procedure2/TestProcedureInMemoryChore.java  |   7 +-
 .../hbase/procedure2/util/TestDelayedUtil.java  |  87 ++
 .../org/apache/hadoop/hbase/master/HMaster.java |   2 +
 .../master/procedure/MasterProcedureEnv.java    |   8 +-
 .../procedure/MasterProcedureScheduler.java     |   8 +-
 ...ProcedureSchedulerPerformanceEvaluation.java |  18 +-
 .../procedure/TestMasterProcedureScheduler.java |   2 +
 ...TestMasterProcedureSchedulerConcurrency.java |  11 +
 14 files changed, 1330 insertions(+), 607 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e9dabe/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
index c4ae877..dc94983 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.procedure2;
 
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
@@ -123,17 +122,26 @@ public abstract class AbstractProcedureScheduler 
implements ProcedureScheduler {
     return poll(unit.toNanos(timeout));
   }
 
-  public Procedure poll(long nanos) {
-    final boolean waitForever = (nanos < 0);
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP")
+  public Procedure poll(final long nanos) {
     schedLock();
     try {
-      while (!queueHasRunnables()) {
-        if (!running) return null;
-        if (waitForever) {
+      if (!running) {
+        LOG.debug("the scheduler is not running");
+        return null;
+      }
+
+      if (!queueHasRunnables()) {
+        // WA_AWAIT_NOT_IN_LOOP: we are not in a loop because we want the 
caller
+        // to take decisions after a wake/interruption.
+        if (nanos < 0) {
           schedWaitCond.await();
         } else {
-          if (nanos <= 0) return null;
-          nanos = schedWaitCond.awaitNanos(nanos);
+          schedWaitCond.awaitNanos(nanos);
+        }
+        if (!queueHasRunnables()) {
+          nullPollCalls++;
+          return null;
         }
       }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e9dabe/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
index eafb19a..19604e5 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java
@@ -57,26 +57,30 @@ import com.google.common.annotations.VisibleForTesting;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public abstract class Procedure<TEnvironment> implements Comparable<Procedure> 
{
+  protected static final long NO_PROC_ID = -1;
+  protected static final int NO_TIMEOUT = -1;
+
   // unchanged after initialization
+  private NonceKey nonceKey = null;
   private String owner = null;
-  private Long parentProcId = null;
-  private Long procId = null;
+  private long parentProcId = NO_PROC_ID;
+  private long rootProcId = NO_PROC_ID;
+  private long procId = NO_PROC_ID;
   private long startTime;
 
   // runtime state, updated every operation
   private ProcedureState state = ProcedureState.INITIALIZING;
-  private Integer timeout = null;
+  private RemoteProcedureException exception = null;
   private int[] stackIndexes = null;
   private int childrenLatch = 0;
-  private long lastUpdate;
 
-  // TODO: it will be nice having pointers to allow the scheduler doing 
suspend/resume tricks
-  private boolean suspended = false;
+  private volatile int timeout = NO_TIMEOUT;
+  private volatile long lastUpdate;
 
-  private RemoteProcedureException exception = null;
-  private byte[] result = null;
+  private volatile byte[] result = null;
 
-  private NonceKey nonceKey = null;
+  // TODO: it will be nice having pointers to allow the scheduler doing 
suspend/resume tricks
+  private boolean suspended = false;
 
   /**
    * The main code of the procedure. It must be idempotent since execute()
@@ -235,13 +239,11 @@ public abstract class Procedure<TEnvironment> implements 
Comparable<Procedure> {
    * @return the StringBuilder
    */
   protected StringBuilder toStringSimpleSB() {
-    StringBuilder sb = new StringBuilder();
+    final StringBuilder sb = new StringBuilder();
     toStringClassDetails(sb);
 
-    if (procId != null) {
-      sb.append(" id=");
-      sb.append(getProcId());
-    }
+    sb.append(" id=");
+    sb.append(getProcId());
 
     if (hasParent()) {
       sb.append(" parent=");
@@ -256,6 +258,10 @@ public abstract class Procedure<TEnvironment> implements 
Comparable<Procedure> {
     sb.append(" state=");
     toStringState(sb);
 
+    if (hasException()) {
+      sb.append(" failed=" + getException());
+    }
+
     return sb;
   }
 
@@ -264,7 +270,7 @@ public abstract class Procedure<TEnvironment> implements 
Comparable<Procedure> {
    * details
    */
   public String toStringDetails() {
-    StringBuilder sb = toStringSimpleSB();
+    final StringBuilder sb = toStringSimpleSB();
 
     sb.append(" startTime=");
     sb.append(getStartTime());
@@ -272,7 +278,7 @@ public abstract class Procedure<TEnvironment> implements 
Comparable<Procedure> {
     sb.append(" lastUpdate=");
     sb.append(getLastUpdate());
 
-    int[] stackIndices = getStackIndexes();
+    final int[] stackIndices = getStackIndexes();
     if (stackIndices != null) {
       sb.append("\n");
       sb.append("stackIndexes=");
@@ -285,7 +291,6 @@ public abstract class Procedure<TEnvironment> implements 
Comparable<Procedure> {
   protected String toStringClass() {
     StringBuilder sb = new StringBuilder();
     toStringClassDetails(sb);
-
     return sb.toString();
   }
 
@@ -309,101 +314,180 @@ public abstract class Procedure<TEnvironment> 
implements Comparable<Procedure> {
     builder.append(getClass().getName());
   }
 
+  // ==========================================================================
+  //  Those fields are unchanged after initialization.
+  //
+  //  Each procedure will get created from the user or during
+  //  ProcedureExecutor.start() during the load() phase and then submitted
+  //  to the executor. these fields will never be changed after initialization
+  // ==========================================================================
+  public long getProcId() {
+    return procId;
+  }
+
+  public boolean hasParent() {
+    return parentProcId != NO_PROC_ID;
+  }
+
+  public long getParentProcId() {
+    return parentProcId;
+  }
+
+  public long getRootProcId() {
+    return rootProcId;
+  }
+
+  public NonceKey getNonceKey() {
+    return nonceKey;
+  }
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public String getOwner() {
+    return owner;
+  }
+
+  public boolean hasOwner() {
+    return owner != null;
+  }
+
   /**
-   * @return the serialized result if any, otherwise null
+   * Called by the ProcedureExecutor to assign the ID to the newly created 
procedure.
    */
-  public byte[] getResult() {
-    return result;
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  protected void setProcId(final long procId) {
+    this.procId = procId;
+    this.startTime = EnvironmentEdgeManager.currentTime();
+    setState(ProcedureState.RUNNABLE);
   }
 
   /**
-   * The procedure may leave a "result" on completion.
-   * @param result the serialized result that will be passed to the client
+   * Called by the ProcedureExecutor to assign the parent to the newly created 
procedure.
    */
-  protected void setResult(final byte[] result) {
-    this.result = result;
+  @InterfaceAudience.Private
+  protected void setParentProcId(final long parentProcId) {
+    this.parentProcId = parentProcId;
   }
 
-  public long getProcId() {
-    return procId;
+  @InterfaceAudience.Private
+  protected void setRootProcId(final long rootProcId) {
+    this.rootProcId = rootProcId;
   }
 
-  public boolean hasParent() {
-    return parentProcId != null;
+  /**
+   * Called by the ProcedureExecutor to set the value to the newly created 
procedure.
+   */
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  protected void setNonceKey(final NonceKey nonceKey) {
+    this.nonceKey = nonceKey;
   }
 
-  public boolean hasException() {
-    return exception != null;
+  @VisibleForTesting
+  @InterfaceAudience.Private
+  public void setOwner(final String owner) {
+    this.owner = StringUtils.isEmpty(owner) ? null : owner;
+  }
+
+  /**
+   * Called on store load to initialize the Procedure internals after
+   * the creation/deserialization.
+   */
+  @InterfaceAudience.Private
+  protected void setStartTime(final long startTime) {
+    this.startTime = startTime;
+  }
+
+  // ==========================================================================
+  //  runtime state - timeout related
+  // ==========================================================================
+  /**
+   * @param timeout timeout interval in msec
+   */
+  protected void setTimeout(final int timeout) {
+    this.timeout = timeout;
   }
 
   public boolean hasTimeout() {
-    return timeout != null;
+    return timeout != NO_TIMEOUT;
   }
 
-  public long getParentProcId() {
-    return parentProcId.longValue();
+  /**
+   * @return the timeout in msec
+   */
+  public int getTimeout() {
+    return timeout;
   }
 
-  public NonceKey getNonceKey() {
-    return nonceKey;
+  /**
+   * Called on store load to initialize the Procedure internals after
+   * the creation/deserialization.
+   */
+  @InterfaceAudience.Private
+  protected void setLastUpdate(final long lastUpdate) {
+    this.lastUpdate = lastUpdate;
   }
 
   /**
-   * @return true if the procedure is in a RUNNABLE state.
+   * Called by ProcedureExecutor after each time a procedure step is executed.
    */
-  protected synchronized boolean isRunnable() {
-    return state == ProcedureState.RUNNABLE;
+  @InterfaceAudience.Private
+  protected void updateTimestamp() {
+    this.lastUpdate = EnvironmentEdgeManager.currentTime();
   }
 
-  public synchronized boolean isInitializing() {
-    return state == ProcedureState.INITIALIZING;
+  public long getLastUpdate() {
+    return lastUpdate;
   }
 
   /**
-   * @return true if the procedure has failed.
-   *         true may mean failed but not yet rolledback or failed and 
rolledback.
+   * Timeout of the next timeout.
+   * Called by the ProcedureExecutor if the procedure has timeout set and
+   * the procedure is in the waiting queue.
+   * @return the timestamp of the next timeout.
    */
-  public synchronized boolean isFailed() {
-    return exception != null || state == ProcedureState.ROLLEDBACK;
+  @InterfaceAudience.Private
+  protected long getTimeoutTimestamp() {
+    return getLastUpdate() + getTimeout();
   }
 
+  // ==========================================================================
+  //  runtime state
+  // ==========================================================================
   /**
-   * @return true if the procedure is finished successfully.
+   * @return the time elapsed between the last update and the start time of 
the procedure.
    */
-  public synchronized boolean isSuccess() {
-    return state == ProcedureState.FINISHED && exception == null;
+  public long elapsedTime() {
+    return getLastUpdate() - getStartTime();
   }
 
   /**
-   * @return true if the procedure is finished. The Procedure may be completed
-   *         successfuly or failed and rolledback.
+   * @return the serialized result if any, otherwise null
    */
-  public synchronized boolean isFinished() {
-    switch (state) {
-      case ROLLEDBACK:
-        return true;
-      case FINISHED:
-        return exception == null;
-      default:
-        break;
-    }
-    return false;
+  public byte[] getResult() {
+    return result;
   }
 
   /**
-   * @return true if the procedure is waiting for a child to finish or for an 
external event.
+   * The procedure may leave a "result" on completion.
+   * @param result the serialized result that will be passed to the client
    */
-  public synchronized boolean isWaiting() {
-    switch (state) {
-      case WAITING:
-      case WAITING_TIMEOUT:
-        return true;
-      default:
-        break;
-    }
-    return false;
+  protected void setResult(final byte[] result) {
+    this.result = result;
   }
 
+  // 
==============================================================================================
+  //  Runtime state, updated every operation by the ProcedureExecutor
+  //
+  //  There is always 1 thread at the time operating on the state of the 
procedure.
+  //  The ProcedureExecutor may check and set states, or some Procecedure may
+  //  update its own state. but no concurrent updates. we use synchronized here
+  //  just because the procedure can get scheduled on different executor 
threads on each step.
+  // 
==============================================================================================
+
   /**
    * @return true if the procedure is in a suspended state,
    *         waiting for the resources required to execute the procedure will 
become available.
@@ -421,55 +505,60 @@ public abstract class Procedure<TEnvironment> implements 
Comparable<Procedure> {
     suspended = false;
   }
 
-  public synchronized RemoteProcedureException getException() {
-    return exception;
-  }
-
-  public long getStartTime() {
-    return startTime;
-  }
-
-  public synchronized long getLastUpdate() {
-    return lastUpdate;
+  /**
+   * @return true if the procedure is in a RUNNABLE state.
+   */
+  protected synchronized boolean isRunnable() {
+    return state == ProcedureState.RUNNABLE;
   }
 
-  public synchronized long elapsedTime() {
-    return lastUpdate - startTime;
+  public synchronized boolean isInitializing() {
+    return state == ProcedureState.INITIALIZING;
   }
 
   /**
-   * @param timeout timeout in msec
+   * @return true if the procedure has failed.
+   *         true may mean failed but not yet rolledback or failed and 
rolledback.
    */
-  protected void setTimeout(final int timeout) {
-    this.timeout = timeout;
+  public synchronized boolean isFailed() {
+    return exception != null || state == ProcedureState.ROLLEDBACK;
   }
 
   /**
-   * @return the timeout in msec
+   * @return true if the procedure is finished successfully.
    */
-  public int getTimeout() {
-    return timeout.intValue();
+  public synchronized boolean isSuccess() {
+    return state == ProcedureState.FINISHED && exception == null;
   }
 
   /**
-   * @return the remaining time before the timeout
+   * @return true if the procedure is finished. The Procedure may be completed
+   *         successfuly or failed and rolledback.
    */
-  public long getTimeRemaining() {
-    return Math.max(0, timeout - (EnvironmentEdgeManager.currentTime() - 
startTime));
-  }
-
-  @VisibleForTesting
-  @InterfaceAudience.Private
-  public void setOwner(final String owner) {
-    this.owner = StringUtils.isEmpty(owner) ? null : owner;
-  }
-
-  public String getOwner() {
-    return owner;
+  public synchronized boolean isFinished() {
+    switch (state) {
+      case ROLLEDBACK:
+        return true;
+      case FINISHED:
+        return exception == null;
+      default:
+        break;
+    }
+    return false;
   }
 
-  public boolean hasOwner() {
-    return owner != null;
+  /**
+   * @return true if the procedure is waiting for a child to finish or for an 
external event.
+   */
+  public synchronized boolean isWaiting() {
+    switch (state) {
+      case WAITING:
+      case WAITING_TIMEOUT:
+        return true;
+      default:
+        break;
+    }
+    return false;
   }
 
   @VisibleForTesting
@@ -514,101 +603,12 @@ public abstract class Procedure<TEnvironment> implements 
Comparable<Procedure> {
     return false;
   }
 
-  /**
-   * Called by the ProcedureExecutor to assign the ID to the newly created 
procedure.
-   */
-  @VisibleForTesting
-  @InterfaceAudience.Private
-  protected void setProcId(final long procId) {
-    this.procId = procId;
-    this.startTime = EnvironmentEdgeManager.currentTime();
-    setState(ProcedureState.RUNNABLE);
-  }
-
-  /**
-   * Called by the ProcedureExecutor to assign the parent to the newly created 
procedure.
-   */
-  @InterfaceAudience.Private
-  protected void setParentProcId(final long parentProcId) {
-    this.parentProcId = parentProcId;
-  }
-
-  /**
-   * Called by the ProcedureExecutor to set the value to the newly created 
procedure.
-   */
-  @VisibleForTesting
-  @InterfaceAudience.Private
-  protected void setNonceKey(final NonceKey nonceKey) {
-    this.nonceKey = nonceKey;
-  }
-
-  /**
-   * Internal method called by the ProcedureExecutor that starts the
-   * user-level code execute().
-   */
-  @InterfaceAudience.Private
-  protected Procedure[] doExecute(final TEnvironment env)
-      throws ProcedureYieldException, ProcedureSuspendedException, 
InterruptedException {
-    try {
-      updateTimestamp();
-      return execute(env);
-    } finally {
-      updateTimestamp();
-    }
-  }
-
-  /**
-   * Internal method called by the ProcedureExecutor that starts the
-   * user-level code rollback().
-   */
-  @InterfaceAudience.Private
-  protected void doRollback(final TEnvironment env)
-      throws IOException, InterruptedException {
-    try {
-      updateTimestamp();
-      rollback(env);
-    } finally {
-      updateTimestamp();
-    }
-  }
-
-  /**
-   * Internal method called by the ProcedureExecutor that starts the
-   * user-level code acquireLock().
-   */
-  @InterfaceAudience.Private
-  protected boolean doAcquireLock(final TEnvironment env) {
-    return acquireLock(env);
-  }
-
-  /**
-   * Internal method called by the ProcedureExecutor that starts the
-   * user-level code releaseLock().
-   */
-  @InterfaceAudience.Private
-  protected void doReleaseLock(final TEnvironment env) {
-    releaseLock(env);
-  }
-
-  /**
-   * Called on store load to initialize the Procedure internals after
-   * the creation/deserialization.
-   */
-  @InterfaceAudience.Private
-  protected void setStartTime(final long startTime) {
-    this.startTime = startTime;
-  }
-
-  /**
-   * Called on store load to initialize the Procedure internals after
-   * the creation/deserialization.
-   */
-  protected synchronized void setLastUpdate(final long lastUpdate) {
-    this.lastUpdate = lastUpdate;
+  public synchronized boolean hasException() {
+    return exception != null;
   }
 
-  protected synchronized void updateTimestamp() {
-    this.lastUpdate = EnvironmentEdgeManager.currentTime();
+  public synchronized RemoteProcedureException getException() {
+    return exception;
   }
 
   /**
@@ -629,8 +629,7 @@ public abstract class Procedure<TEnvironment> implements 
Comparable<Procedure> {
   }
 
   /**
-   * Called by the ProcedureExecutor to notify that one of the sub-procedures
-   * has completed.
+   * Called by the ProcedureExecutor to notify that one of the sub-procedures 
has completed.
    */
   @InterfaceAudience.Private
   protected synchronized boolean childrenCountDown() {
@@ -643,6 +642,7 @@ public abstract class Procedure<TEnvironment> implements 
Comparable<Procedure> {
     return childrenLatch > 0;
   }
 
+  @InterfaceAudience.Private
   protected synchronized int getChildrenLatch() {
     return childrenLatch;
   }
@@ -695,12 +695,63 @@ public abstract class Procedure<TEnvironment> implements 
Comparable<Procedure> {
     return stackIndexes;
   }
 
+  // ==========================================================================
+  //  Internal methods - called by the ProcedureExecutor
+  // ==========================================================================
+
+  /**
+   * Internal method called by the ProcedureExecutor that starts the 
user-level code execute().
+   */
+  @InterfaceAudience.Private
+  protected Procedure[] doExecute(final TEnvironment env)
+      throws ProcedureYieldException, ProcedureSuspendedException, 
InterruptedException {
+    try {
+      updateTimestamp();
+      return execute(env);
+    } finally {
+      updateTimestamp();
+    }
+  }
+
+  /**
+   * Internal method called by the ProcedureExecutor that starts the 
user-level code rollback().
+   */
+  @InterfaceAudience.Private
+  protected void doRollback(final TEnvironment env)
+      throws IOException, InterruptedException {
+    try {
+      updateTimestamp();
+      rollback(env);
+    } finally {
+      updateTimestamp();
+    }
+  }
+
+  /**
+   * Internal method called by the ProcedureExecutor that starts the 
user-level code acquireLock().
+   */
+  @InterfaceAudience.Private
+  protected boolean doAcquireLock(final TEnvironment env) {
+    return acquireLock(env);
+  }
+
+  /**
+   * Internal method called by the ProcedureExecutor that starts the 
user-level code releaseLock().
+   */
+  @InterfaceAudience.Private
+  protected void doReleaseLock(final TEnvironment env) {
+    releaseLock(env);
+  }
+
   @Override
   public int compareTo(final Procedure other) {
-    long diff = getProcId() - other.getProcId();
-    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
+    return Long.compare(getProcId(), other.getProcId());
   }
 
+  // ==========================================================================
+  //  misc utils
+  // ==========================================================================
+
   /**
    * Get an hashcode for the specified Procedure ID
    * @return the hashcode for the specified procId

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e9dabe/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
index 4976ea0..f167f4a 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java
@@ -24,20 +24,16 @@ import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.DelayQueue;
-import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
@@ -50,6 +46,8 @@ import 
org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
 import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
 import 
org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator;
+import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
+import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
 import org.apache.hadoop.hbase.procedure2.util.StringUtils;
 import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
 import org.apache.hadoop.hbase.security.User;
@@ -75,6 +73,13 @@ import org.apache.hadoop.hbase.util.Pair;
 public class ProcedureExecutor<TEnvironment> {
   private static final Log LOG = LogFactory.getLog(ProcedureExecutor.class);
 
+  public static final String CHECK_OWNER_SET_CONF_KEY = 
"hbase.procedure.check.owner.set";
+  private static final boolean DEFAULT_CHECK_OWNER_SET = false;
+
+  public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY =
+      "hbase.procedure.worker.keep.alive.time.msec";
+  private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = Long.MAX_VALUE;
+
   Testing testing = null;
   public static class Testing {
     protected boolean killBeforeStoreUpdate = false;
@@ -97,62 +102,6 @@ public class ProcedureExecutor<TEnvironment> {
   }
 
   /**
-   * Used by the DelayQueue to get the timeout interval of the procedure
-   */
-  private static class DelayedContainer implements Delayed {
-    static final DelayedContainer POISON = new DelayedContainer();
-
-    /** null if poison */
-    final Procedure proc;
-    final long timeoutTime;
-
-    DelayedContainer(Procedure proc) {
-      assert proc != null;
-      this.proc = proc;
-      this.timeoutTime = proc.getLastUpdate() + proc.getTimeout();
-    }
-
-    DelayedContainer() {
-      this.proc = null;
-      this.timeoutTime = Long.MIN_VALUE;
-    }
-
-    @Override
-    public long getDelay(TimeUnit unit) {
-      long currentTime = EnvironmentEdgeManager.currentTime();
-      if (currentTime >= timeoutTime) {
-        return 0;
-      }
-      return unit.convert(timeoutTime - currentTime, TimeUnit.MICROSECONDS);
-    }
-
-    /**
-     * @throws NullPointerException {@inheritDoc}
-     * @throws ClassCastException {@inheritDoc}
-     */
-    @Override
-    public int compareTo(Delayed o) {
-      return Long.compare(timeoutTime, ((DelayedContainer)o).timeoutTime);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (obj == this) {
-        return true;
-      }
-      if (! (obj instanceof DelayedContainer)) {
-        return false;
-      }
-      return Objects.equals(proc, ((DelayedContainer)obj).proc);
-    }
-
-    @Override
-    public int hashCode() {
-      return proc != null ? proc.hashCode() : 0;
-    }
-  }
-
-  /**
    * Internal cleaner that removes the completed procedure results after a TTL.
    * NOTE: This is a special case handled in timeoutLoop().
    *
@@ -186,7 +135,7 @@ public class ProcedureExecutor<TEnvironment> {
     private final Map<Long, ProcedureInfo> completed;
     private final Map<NonceKey, Long> nonceKeysToProcIdsMap;
     private final ProcedureStore store;
-    private final Configuration conf;
+    private Configuration conf;
 
     public CompletedProcedureCleaner(final Configuration conf, final 
ProcedureStore store,
         final Map<Long, ProcedureInfo> completedMap,
@@ -274,39 +223,33 @@ public class ProcedureExecutor<TEnvironment> {
    * Helper map to lookup whether the procedure already issued from the same 
client.
    * This map contains every root procedure.
    */
-  private ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap =
+  private final ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap =
       new ConcurrentHashMap<NonceKey, Long>();
 
-  /**
-   * Timeout Queue that contains Procedures in a WAITING_TIMEOUT state
-   * or periodic procedures.
-   */
-  private final DelayQueue<DelayedContainer> waitingTimeout =
-    new DelayQueue<DelayedContainer>();
+  private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners =
+    new CopyOnWriteArrayList<ProcedureExecutorListener>();
+
+  private Configuration conf;
+  private ThreadGroup threadGroup;
+  private CopyOnWriteArrayList<WorkerThread> workerThreads;
+  private TimeoutExecutorThread timeoutExecutor;
+  private int corePoolSize;
+
+  private volatile long keepAliveTime = Long.MAX_VALUE;
 
   /**
    * Scheduler/Queue that contains runnable procedures.
    */
   private final ProcedureScheduler scheduler;
 
-  // TODO
-  private final ReentrantLock submitLock = new ReentrantLock();
   private final AtomicLong lastProcId = new AtomicLong(-1);
-
-  private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners =
-    new CopyOnWriteArrayList<ProcedureExecutorListener>();
-
+  private final AtomicLong workerId = new AtomicLong(0);
   private final AtomicInteger activeExecutorCount = new AtomicInteger(0);
   private final AtomicBoolean running = new AtomicBoolean(false);
   private final TEnvironment environment;
   private final ProcedureStore store;
-  private final Configuration conf;
-
-  private static final String CHECK_OWNER_SET_CONF_KEY = 
"hbase.procedure.check.owner.set";
   private final boolean checkOwnerSet;
 
-  private Thread[] threads;
-
   public ProcedureExecutor(final Configuration conf, final TEnvironment 
environment,
       final ProcedureStore store) {
     this(conf, environment, store, new SimpleProcedureScheduler());
@@ -318,15 +261,15 @@ public class ProcedureExecutor<TEnvironment> {
     this.scheduler = scheduler;
     this.store = store;
     this.conf = conf;
-    this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, true);
+    this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, 
DEFAULT_CHECK_OWNER_SET);
+    refreshConfiguration(conf);
   }
 
   private void load(final boolean abortOnCorruption) throws IOException {
-    Preconditions.checkArgument(completed.isEmpty());
-    Preconditions.checkArgument(rollbackStack.isEmpty());
-    Preconditions.checkArgument(procedures.isEmpty());
-    Preconditions.checkArgument(waitingTimeout.isEmpty());
-    Preconditions.checkArgument(scheduler.size() == 0);
+    Preconditions.checkArgument(completed.isEmpty(), "completed not empty");
+    Preconditions.checkArgument(rollbackStack.isEmpty(), "rollback state not 
empty");
+    Preconditions.checkArgument(procedures.isEmpty(), "procedure map not 
empty");
+    Preconditions.checkArgument(scheduler.size() == 0, "run queue not empty");
 
     store.load(new ProcedureStore.ProcedureLoader() {
       @Override
@@ -435,6 +378,7 @@ public class ProcedureExecutor<TEnvironment> {
       RootProcedureState procStack = rollbackStack.get(rootProcId);
       procStack.loadStack(proc);
 
+      proc.setRootProcId(rootProcId);
       switch (proc.getState()) {
         case RUNNABLE:
           runnableList.add(proc);
@@ -526,26 +470,21 @@ public class ProcedureExecutor<TEnvironment> {
 
     // We have numThreads executor + one timer thread used for timing out
     // procedures and triggering periodic procedures.
-    threads = new Thread[numThreads + 1];
-    LOG.info("Starting procedure executor threads=" + threads.length);
-
-    // Initialize procedures executor
-    for (int i = 0; i < numThreads; ++i) {
-      threads[i] = new Thread("ProcedureExecutor-" + i) {
-        @Override
-        public void run() {
-          execLoop();
-        }
-      };
-    }
+    this.corePoolSize = numThreads;
+    LOG.info("Starting procedure executor threads=" + corePoolSize);
 
-    // Initialize procedures timeout handler (this is the +1 thread)
-    threads[numThreads] = new Thread("ProcedureExecutorTimeoutThread") {
-      @Override
-      public void run() {
-        timeoutLoop();
-      }
-    };
+    // Create the Thread Group for the executors
+    threadGroup = new ThreadGroup("ProcedureExecutor");
+
+    // Create the timeout executor
+    timeoutExecutor = new TimeoutExecutorThread(threadGroup);
+
+    // Create the workers
+    workerId.set(0);
+    workerThreads = new CopyOnWriteArrayList<WorkerThread>();
+    for (int i = 0; i < corePoolSize; ++i) {
+      workerThreads.add(new WorkerThread(threadGroup));
+    }
 
     long st, et;
 
@@ -571,10 +510,15 @@ public class ProcedureExecutor<TEnvironment> {
       store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st)));
 
     // Start the executors. Here we must have the lastProcId set.
-    for (int i = 0; i < threads.length; ++i) {
-      threads[i].start();
+    LOG.debug("start workers " + workerThreads.size());
+    timeoutExecutor.start();
+    for (WorkerThread worker: workerThreads) {
+      worker.start();
     }
 
+    // Internal chores
+    timeoutExecutor.add(new WorkerMonitor());
+
     // Add completed cleaner chore
     addChore(new CompletedProcedureCleaner(conf, store, completed, 
nonceKeysToProcIdsMap));
   }
@@ -586,42 +530,66 @@ public class ProcedureExecutor<TEnvironment> {
 
     LOG.info("Stopping the procedure executor");
     scheduler.stop();
-    waitingTimeout.add(DelayedContainer.POISON);
+    timeoutExecutor.sendStopSignal();
   }
 
   public void join() {
-    boolean interrupted = false;
+    assert !isRunning() : "expected not running";
 
-    for (int i = 0; i < threads.length; ++i) {
-      try {
-        threads[i].join();
-      } catch (InterruptedException ex) {
-        interrupted = true;
-      }
+    // stop the timeout executor
+    timeoutExecutor.awaitTermination();
+    timeoutExecutor = null;
+
+    // stop the worker threads
+    for (WorkerThread worker: workerThreads) {
+      worker.awaitTermination();
     }
+    workerThreads = null;
 
-    if (interrupted) {
-      Thread.currentThread().interrupt();
+    // Destroy the Thread Group for the executors
+    try {
+      threadGroup.destroy();
+    } catch (IllegalThreadStateException e) {
+      LOG.error("thread group " + threadGroup + " contains running threads");
+      threadGroup.list();
+    } finally {
+      threadGroup = null;
     }
 
+    // reset the in-memory state for testing
     completed.clear();
     rollbackStack.clear();
     procedures.clear();
     nonceKeysToProcIdsMap.clear();
-    waitingTimeout.clear();
     scheduler.clear();
     lastProcId.set(-1);
   }
 
+  public void refreshConfiguration(final Configuration conf) {
+    this.conf = conf;
+    setKeepAliveTime(conf.getLong(WORKER_KEEP_ALIVE_TIME_CONF_KEY,
+        DEFAULT_WORKER_KEEP_ALIVE_TIME), TimeUnit.MILLISECONDS);
+  }
+
+  // ==========================================================================
+  //  Accessors
+  // ==========================================================================
   public boolean isRunning() {
     return running.get();
   }
 
   /**
-   * @return the number of execution threads.
+   * @return the current number of worker threads.
+   */
+  public int getWorkerThreadCount() {
+    return workerThreads.size();
+  }
+
+  /**
+   * @return the core pool size settings.
    */
-  public int getNumThreads() {
-    return threads == null ? 0 : (threads.length - 1);
+  public int getCorePoolSize() {
+    return corePoolSize;
   }
 
   public int getActiveExecutorCount() {
@@ -636,41 +604,30 @@ public class ProcedureExecutor<TEnvironment> {
     return this.store;
   }
 
-  public void registerListener(ProcedureExecutorListener listener) {
-    this.listeners.add(listener);
+  protected ProcedureScheduler getScheduler() {
+    return scheduler;
   }
 
-  public boolean unregisterListener(ProcedureExecutorListener listener) {
-    return this.listeners.remove(listener);
+  public void setKeepAliveTime(final long keepAliveTime, final TimeUnit 
timeUnit) {
+    this.keepAliveTime = timeUnit.toMillis(keepAliveTime);
+    this.scheduler.signalAll();
   }
 
-  /**
-   * List procedures.
-   * @return the procedures in a list
-   */
-  public List<ProcedureInfo> listProcedures() {
-    final List<ProcedureInfo> procedureLists =
-        new ArrayList<ProcedureInfo>(procedures.size() + completed.size());
-    for (Map.Entry<Long, Procedure> p: procedures.entrySet()) {
-      procedureLists.add(ProcedureUtil.convertToProcedureInfo(p.getValue()));
-    }
-    for (Map.Entry<Long, ProcedureInfo> e: completed.entrySet()) {
-      // Note: The procedure could show up twice in the list with different 
state, as
-      // it could complete after we walk through procedures list and insert 
into
-      // procedureList - it is ok, as we will use the information in the 
ProcedureInfo
-      // to figure it out; to prevent this would increase the complexity of 
the logic.
-      procedureLists.add(e.getValue());
-    }
-    return procedureLists;
+  public long getKeepAliveTime(final TimeUnit timeUnit) {
+    return timeUnit.convert(keepAliveTime, TimeUnit.MILLISECONDS);
   }
 
+  // ==========================================================================
+  //  Submit/Remove Chores
+  // ==========================================================================
+
   /**
    * Add a chore procedure to the executor
    * @param chore the chore to add
    */
   public void addChore(final ProcedureInMemoryChore chore) {
-    chore.setState(ProcedureState.RUNNABLE);
-    waitingTimeout.add(new DelayedContainer(chore));
+    chore.setState(ProcedureState.WAITING_TIMEOUT);
+    timeoutExecutor.add(chore);
   }
 
   /**
@@ -680,9 +637,13 @@ public class ProcedureExecutor<TEnvironment> {
    */
   public boolean removeChore(final ProcedureInMemoryChore chore) {
     chore.setState(ProcedureState.FINISHED);
-    return waitingTimeout.remove(new DelayedContainer(chore));
+    return timeoutExecutor.remove(chore);
   }
 
+  // ==========================================================================
+  //  Submit/Abort Procedure
+  // ==========================================================================
+
   /**
    * Add a new root-procedure to the executor.
    * @param proc the new procedure to execute.
@@ -699,20 +660,17 @@ public class ProcedureExecutor<TEnvironment> {
    * @param nonce
    * @return the procedure id, that can be used to monitor the operation
    */
-  public long submitProcedure(
-      final Procedure proc,
-      final long nonceGroup,
-      final long nonce) {
-    Preconditions.checkArgument(proc.getState() == 
ProcedureState.INITIALIZING);
-    Preconditions.checkArgument(isRunning());
+  public long submitProcedure(final Procedure proc, final long nonceGroup, 
final long nonce) {
     Preconditions.checkArgument(lastProcId.get() >= 0);
-    Preconditions.checkArgument(!proc.hasParent());
+    Preconditions.checkArgument(proc.getState() == 
ProcedureState.INITIALIZING);
+    Preconditions.checkArgument(isRunning(), "executor not running");
+    Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc);
     if (this.checkOwnerSet) {
-      Preconditions.checkArgument(proc.hasOwner());
+      Preconditions.checkArgument(proc.hasOwner(), "missing owner");
     }
 
     // Initialize the Procedure ID
-    long currentProcId = nextProcId();
+    final long currentProcId = nextProcId();
     proc.setProcId(currentProcId);
 
     // Check whether the proc exists.  If exist, just return the proc id.
@@ -747,6 +705,41 @@ public class ProcedureExecutor<TEnvironment> {
     return currentProcId;
   }
 
+  /**
+   * Send an abort notification the specified procedure.
+   * Depending on the procedure implementation the abort can be considered or 
ignored.
+   * @param procId the procedure to abort
+   * @return true if the procedure exist and has received the abort, otherwise 
false.
+   */
+  public boolean abort(final long procId) {
+    return abort(procId, true);
+  }
+
+  /**
+   * Send an abort notification the specified procedure.
+   * Depending on the procedure implementation the abort can be considered or 
ignored.
+   * @param procId the procedure to abort
+   * @param mayInterruptIfRunning if the proc completed at least one step, 
should it be aborted?
+   * @return true if the procedure exist and has received the abort, otherwise 
false.
+   */
+  public boolean abort(final long procId, final boolean mayInterruptIfRunning) 
{
+    final Procedure proc = procedures.get(procId);
+    if (proc != null) {
+      if (!mayInterruptIfRunning && proc.wasExecuted()) {
+        return false;
+      }
+      return proc.abort(getEnvironment());
+    }
+    return false;
+  }
+
+  // ==========================================================================
+  //  Executor query helpers
+  // ==========================================================================
+  public Procedure getProcedure(final long procId) {
+    return procedures.get(procId);
+  }
+
   public ProcedureInfo getResult(final long procId) {
     return completed.get(procId);
   }
@@ -768,7 +761,7 @@ public class ProcedureExecutor<TEnvironment> {
    * @return true if the procedure execution is started, otherwise false.
    */
   public boolean isStarted(final long procId) {
-    Procedure proc = procedures.get(procId);
+    final Procedure proc = procedures.get(procId);
     if (proc == null) {
       return completed.get(procId) != null;
     }
@@ -780,7 +773,7 @@ public class ProcedureExecutor<TEnvironment> {
    * @param procId the ID of the procedure to remove
    */
   public void removeResult(final long procId) {
-    ProcedureInfo result = completed.get(procId);
+    final ProcedureInfo result = completed.get(procId);
     if (result == null) {
       assert !procedures.containsKey(procId) : "procId=" + procId + " is still 
running";
       if (LOG.isDebugEnabled()) {
@@ -793,33 +786,16 @@ public class ProcedureExecutor<TEnvironment> {
     result.setClientAckTime(EnvironmentEdgeManager.currentTime());
   }
 
-  /**
-   * Send an abort notification the specified procedure.
-   * Depending on the procedure implementation the abort can be considered or 
ignored.
-   * @param procId the procedure to abort
-   * @return true if the procedure exist and has received the abort, otherwise 
false.
-   */
-  public boolean abort(final long procId) {
-    return abort(procId, true);
-  }
-
-  /**
-   * Send an abort notification the specified procedure.
-   * Depending on the procedure implementation the abort can be considered or 
ignored.
-   * @param procId the procedure to abort
-   * @param mayInterruptIfRunning if the proc completed at least one step, 
should it be aborted?
-   * @return true if the procedure exist and has received the abort, otherwise 
false.
-   */
-  public boolean abort(final long procId, final boolean mayInterruptIfRunning) 
{
-    Procedure proc = procedures.get(procId);
-    if (proc != null) {
-      if (!mayInterruptIfRunning && proc.wasExecuted()) {
-        return false;
-      } else {
-        return proc.abort(getEnvironment());
+  public Pair<ProcedureInfo, Procedure> getResultOrProcedure(final long 
procId) {
+    ProcedureInfo result = completed.get(procId);
+    Procedure proc = null;
+    if (result == null) {
+      proc = procedures.get(procId);
+      if (proc == null) {
+        result = completed.get(procId);
       }
     }
-    return false;
+    return new Pair(result, proc);
   }
 
   /**
@@ -830,15 +806,14 @@ public class ProcedureExecutor<TEnvironment> {
    *   false otherwise or the owner is unknown.
    */
   public boolean isProcedureOwner(final long procId, final User user) {
-    if (user == null) {
-      return false;
-    }
+    if (user == null) return false;
 
-    Procedure proc = procedures.get(procId);
+    final Procedure proc = procedures.get(procId);
     if (proc != null) {
       return proc.getOwner().equals(user.getShortName());
     }
-    ProcedureInfo procInfo = completed.get(procId);
+
+    final ProcedureInfo procInfo = completed.get(procId);
     if (procInfo == null) {
       // Procedure either does not exist or has already completed and got 
cleaned up.
       // At this time, we cannot check the owner of the procedure
@@ -847,50 +822,113 @@ public class ProcedureExecutor<TEnvironment> {
     return ProcedureInfo.isProcedureOwner(procInfo, user);
   }
 
-  public Map<Long, ProcedureInfo> getResults() {
-    return Collections.unmodifiableMap(completed);
+  /**
+   * List procedures.
+   * @return the procedures in a list
+   */
+  public List<ProcedureInfo> listProcedures() {
+    final List<ProcedureInfo> procedureLists =
+        new ArrayList<ProcedureInfo>(procedures.size() + completed.size());
+    for (Map.Entry<Long, Procedure> p: procedures.entrySet()) {
+      procedureLists.add(ProcedureUtil.convertToProcedureInfo(p.getValue()));
+    }
+    for (Map.Entry<Long, ProcedureInfo> e: completed.entrySet()) {
+      // Note: The procedure could show up twice in the list with different 
state, as
+      // it could complete after we walk through procedures list and insert 
into
+      // procedureList - it is ok, as we will use the information in the 
ProcedureInfo
+      // to figure it out; to prevent this would increase the complexity of 
the logic.
+      procedureLists.add(e.getValue());
+    }
+    return procedureLists;
   }
 
-  public Procedure getProcedure(final long procId) {
-    return procedures.get(procId);
+  // ==========================================================================
+  //  Listeners helpers
+  // ==========================================================================
+  public void registerListener(ProcedureExecutorListener listener) {
+    this.listeners.add(listener);
   }
 
-  protected ProcedureScheduler getScheduler() {
-    return scheduler;
+  public boolean unregisterListener(ProcedureExecutorListener listener) {
+    return this.listeners.remove(listener);
   }
 
-  /**
-   * Execution loop (N threads)
-   * while the executor is in a running state,
-   * fetch a procedure from the scheduler queue and start the execution.
-   */
-  private void execLoop() {
-    while (isRunning()) {
-      Procedure proc = scheduler.poll();
-      if (proc == null) continue;
+  private void sendProcedureLoadedNotification(final long procId) {
+    if (!this.listeners.isEmpty()) {
+      for (ProcedureExecutorListener listener: this.listeners) {
+        try {
+          listener.procedureLoaded(procId);
+        } catch (Throwable e) {
+          LOG.error("The listener " + listener + " had an error: " + 
e.getMessage(), e);
+        }
+      }
+    }
+  }
 
-      try {
-        activeExecutorCount.incrementAndGet();
-        execLoop(proc);
-      } finally {
-        activeExecutorCount.decrementAndGet();
+  private void sendProcedureAddedNotification(final long procId) {
+    if (!this.listeners.isEmpty()) {
+      for (ProcedureExecutorListener listener: this.listeners) {
+        try {
+          listener.procedureAdded(procId);
+        } catch (Throwable e) {
+          LOG.error("The listener " + listener + " had an error: " + 
e.getMessage(), e);
+        }
       }
     }
   }
 
-  private void execLoop(Procedure proc) {
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Trying to start the execution of " + proc);
+  private void sendProcedureFinishedNotification(final long procId) {
+    if (!this.listeners.isEmpty()) {
+      for (ProcedureExecutorListener listener: this.listeners) {
+        try {
+          listener.procedureFinished(procId);
+        } catch (Throwable e) {
+          LOG.error("The listener " + listener + " had an error: " + 
e.getMessage(), e);
+        }
+      }
+    }
+  }
+
+  // ==========================================================================
+  //  Procedure IDs helpers
+  // ==========================================================================
+  private long nextProcId() {
+    long procId = lastProcId.incrementAndGet();
+    if (procId < 0) {
+      while (!lastProcId.compareAndSet(procId, 0)) {
+        procId = lastProcId.get();
+        if (procId >= 0)
+          break;
+      }
+      while (procedures.containsKey(procId)) {
+        procId = lastProcId.incrementAndGet();
+      }
     }
+    assert procId >= 0 : "Invalid procId " + procId;
+    return procId;
+  }
+
+  @VisibleForTesting
+  protected long getLastProcId() {
+    return lastProcId.get();
+  }
 
-    Long rootProcId = getRootProcedureId(proc);
+  private Long getRootProcedureId(Procedure proc) {
+    return Procedure.getRootProcedureId(procedures, proc);
+  }
+
+  // ==========================================================================
+  //  Executions
+  // ==========================================================================
+  private void executeProcedure(final Procedure proc) {
+    final Long rootProcId = getRootProcedureId(proc);
     if (rootProcId == null) {
       // The 'proc' was ready to run but the root procedure was rolledback
       executeRollback(proc);
       return;
     }
 
-    RootProcedureState procStack = rollbackStack.get(rootProcId);
+    final RootProcedureState procStack = rollbackStack.get(rootProcId);
     if (procStack == null) return;
 
     do {
@@ -967,56 +1005,6 @@ public class ProcedureExecutor<TEnvironment> {
     }
   }
 
-  private void timeoutLoop() {
-    while (isRunning()) {
-      Procedure proc;
-      try {
-        proc = waitingTimeout.take().proc;
-      } catch (InterruptedException e) {
-        // Just consume the interruption.
-        continue;
-      }
-      if (proc == null) { // POISON to stop
-        break;
-      }
-
-      // 
----------------------------------------------------------------------------
-      // TODO-MAYBE: Should we provide a notification to the store with the
-      // full set of procedures pending and completed to write a compacted
-      // version of the log (in case is a log)?
-      // In theory no, procedures are have a short life, so at some point the 
store
-      // will have the tracker saying everything is in the last log.
-      // 
----------------------------------------------------------------------------
-
-      // The ProcedureInMemoryChore is a special case, and it acts as a chore.
-      // instead of bringing the Chore class in, we reuse this timeout thread 
for
-      // this special case.
-      if (proc instanceof ProcedureInMemoryChore) {
-        if (proc.isRunnable()) {
-          try {
-            ((ProcedureInMemoryChore)proc).periodicExecute(getEnvironment());
-          } catch (Throwable e) {
-            LOG.error("Ignoring CompletedProcedureCleaner exception: " + 
e.getMessage(), e);
-          }
-          proc.updateTimestamp();
-          if (proc.isRunnable()) waitingTimeout.add(new 
DelayedContainer(proc));
-        }
-        continue;
-      }
-
-      // The procedure received a timeout. if the procedure itself does not 
handle it,
-      // call abort() and add the procedure back in the queue for rollback.
-      if (proc.setTimeoutFailure(getEnvironment())) {
-        long rootProcId = Procedure.getRootProcedureId(procedures, proc);
-        RootProcedureState procStack = rollbackStack.get(rootProcId);
-        procStack.abort();
-        store.update(proc);
-        scheduler.addFront(proc);
-        continue;
-      }
-    }
-  }
-
   /**
    * Execute the rollback of the full procedure stack.
    * Once the procedure is rolledback, the root-procedure will be visible as
@@ -1189,38 +1177,10 @@ public class ProcedureExecutor<TEnvironment> {
             reExecute = true;
           } else {
             // yield the current procedure, and make the subprocedure runnable
-            for (int i = 0; i < subprocs.length; ++i) {
-              Procedure subproc = subprocs[i];
-              if (subproc == null) {
-                String msg = "subproc[" + i + "] is null, aborting the 
procedure";
-                procedure.setFailure(new RemoteProcedureException(msg,
-                  new IllegalArgumentIOException(msg)));
-                subprocs = null;
-                break;
-              }
-
-              assert subproc.getState() == ProcedureState.INITIALIZING : 
subproc;
-              subproc.setParentProcId(procedure.getProcId());
-              subproc.setProcId(nextProcId());
-              procStack.addSubProcedure(subproc);
-            }
-
-            if (!procedure.isFailed()) {
-              procedure.setChildrenLatch(subprocs.length);
-              switch (procedure.getState()) {
-                case RUNNABLE:
-                  procedure.setState(ProcedureState.WAITING);
-                  break;
-                case WAITING_TIMEOUT:
-                  waitingTimeout.add(new DelayedContainer(procedure));
-                  break;
-                default:
-                  break;
-              }
-            }
+            subprocs = initializeChildren(procStack, procedure, subprocs);
           }
         } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) {
-          waitingTimeout.add(new DelayedContainer(procedure));
+          timeoutExecutor.add(procedure);
         } else if (!isSuspended) {
           // No subtask, so we are done
           procedure.setState(ProcedureState.FINISHED);
@@ -1232,8 +1192,8 @@ public class ProcedureExecutor<TEnvironment> {
 
       // allows to kill the executor before something is stored to the wal.
       // useful to test the procedure recovery.
-      if (testing != null && testing.shouldKillBeforeStoreUpdate()) {
-        LOG.debug("TESTING: Kill before store update");
+      if (testing != null && !isSuspended && 
testing.shouldKillBeforeStoreUpdate()) {
+        LOG.debug("TESTING: Kill before store update: " + procedure);
         stop();
         return;
       }
@@ -1242,12 +1202,10 @@ public class ProcedureExecutor<TEnvironment> {
       updateStoreOnExec(procStack, procedure, subprocs);
 
       // if the store is not running we are aborting
-      if (!store.isRunning()) {
-        return;
-      }
+      if (!store.isRunning()) return;
 
       // if the procedure is kind enough to pass the slot to someone else, 
yield
-      if (procedure.getState() == ProcedureState.RUNNABLE &&
+      if (procedure.isRunnable() && !isSuspended &&
           procedure.isYieldAfterExecutionStep(getEnvironment())) {
         scheduler.yield(procedure);
         return;
@@ -1258,34 +1216,81 @@ public class ProcedureExecutor<TEnvironment> {
 
     // Submit the new subprocedures
     if (subprocs != null && !procedure.isFailed()) {
-      for (int i = 0; i < subprocs.length; ++i) {
-        Procedure subproc = subprocs[i];
-        assert !procedures.containsKey(subproc.getProcId());
-        procedures.put(subproc.getProcId(), subproc);
-        scheduler.addFront(subproc);
-      }
+      submitChildrenProcedures(subprocs);
     }
 
+    // if the procedure is complete and has a parent, count down the children 
latch
     if (procedure.isFinished() && procedure.hasParent()) {
-      Procedure parent = procedures.get(procedure.getParentProcId());
-      if (parent == null) {
-        assert procStack.isRollingback();
-        return;
+      countDownChildren(procStack, procedure);
+    }
+  }
+
+  private Procedure[] initializeChildren(final RootProcedureState procStack,
+      final Procedure procedure, final Procedure[] subprocs) {
+    assert subprocs != null : "expected subprocedures";
+    final long rootProcId = getRootProcedureId(procedure);
+    for (int i = 0; i < subprocs.length; ++i) {
+      final Procedure subproc = subprocs[i];
+      if (subproc == null) {
+        String msg = "subproc[" + i + "] is null, aborting the procedure";
+        procedure.setFailure(new RemoteProcedureException(msg,
+          new IllegalArgumentIOException(msg)));
+        return null;
       }
 
-      // If this procedure is the last child awake the parent procedure
-      if (LOG.isTraceEnabled()) {
-        LOG.trace(parent + " child is done: " + procedure);
+      assert subproc.getState() == ProcedureState.INITIALIZING : subproc;
+      subproc.setParentProcId(procedure.getProcId());
+      subproc.setRootProcId(rootProcId);
+      subproc.setProcId(nextProcId());
+      procStack.addSubProcedure(subproc);
+    }
+
+    if (!procedure.isFailed()) {
+      procedure.setChildrenLatch(subprocs.length);
+      switch (procedure.getState()) {
+        case RUNNABLE:
+          procedure.setState(ProcedureState.WAITING);
+          break;
+        case WAITING_TIMEOUT:
+          timeoutExecutor.add(procedure);
+          break;
+        default:
+          break;
       }
-      if (parent.childrenCountDown() && parent.getState() == 
ProcedureState.WAITING) {
-        parent.setState(ProcedureState.RUNNABLE);
-        store.update(parent);
-        scheduler.addFront(parent);
-        if (LOG.isTraceEnabled()) {
-          LOG.trace(parent + " all the children finished their work, resume.");
-        }
-        return;
+    }
+    return subprocs;
+  }
+
+  private void submitChildrenProcedures(final Procedure[] subprocs) {
+    for (int i = 0; i < subprocs.length; ++i) {
+      final Procedure subproc = subprocs[i];
+      assert !procedures.containsKey(subproc.getProcId());
+      procedures.put(subproc.getProcId(), subproc);
+      scheduler.addFront(subproc);
+    }
+  }
+
+  private void countDownChildren(final RootProcedureState procStack, final 
Procedure procedure) {
+    final Procedure parent = procedures.get(procedure.getParentProcId());
+    if (parent == null) {
+      assert procStack.isRollingback();
+      return;
+    }
+
+    // If this procedure is the last child awake the parent procedure
+    final boolean isTraceEnabled = LOG.isTraceEnabled();
+    if (isTraceEnabled) {
+      LOG.trace(parent + " child is done: " + procedure);
+    }
+
+    if (parent.childrenCountDown() && parent.getState() == 
ProcedureState.WAITING) {
+      parent.setState(ProcedureState.RUNNABLE);
+      store.update(parent);
+      scheduler.addFront(parent);
+      if (isTraceEnabled) {
+        LOG.trace(parent + " all the children finished their work, resume.");
       }
+      return;
     }
   }
 
@@ -1329,66 +1334,6 @@ public class ProcedureExecutor<TEnvironment> {
     // (The interrupted procedure will be retried on the next run)
   }
 
-  private void sendProcedureLoadedNotification(final long procId) {
-    if (!this.listeners.isEmpty()) {
-      for (ProcedureExecutorListener listener: this.listeners) {
-        try {
-          listener.procedureLoaded(procId);
-        } catch (Throwable e) {
-          LOG.error("The listener " + listener + " had an error: " + 
e.getMessage(), e);
-        }
-      }
-    }
-  }
-
-  private void sendProcedureAddedNotification(final long procId) {
-    if (!this.listeners.isEmpty()) {
-      for (ProcedureExecutorListener listener: this.listeners) {
-        try {
-          listener.procedureAdded(procId);
-        } catch (Throwable e) {
-          LOG.error("The listener " + listener + " had an error: " + 
e.getMessage(), e);
-        }
-      }
-    }
-  }
-
-  private void sendProcedureFinishedNotification(final long procId) {
-    if (!this.listeners.isEmpty()) {
-      for (ProcedureExecutorListener listener: this.listeners) {
-        try {
-          listener.procedureFinished(procId);
-        } catch (Throwable e) {
-          LOG.error("The listener " + listener + " had an error: " + 
e.getMessage(), e);
-        }
-      }
-    }
-  }
-
-  private long nextProcId() {
-    long procId = lastProcId.incrementAndGet();
-    if (procId < 0) {
-      while (!lastProcId.compareAndSet(procId, 0)) {
-        procId = lastProcId.get();
-        if (procId >= 0)
-          break;
-      }
-      while (procedures.containsKey(procId)) {
-        procId = lastProcId.incrementAndGet();
-      }
-    }
-    return procId;
-  }
-
-  @VisibleForTesting
-  protected long getLastProcId() {
-    return lastProcId.get();
-  }
-
-  private Long getRootProcedureId(Procedure proc) {
-    return Procedure.getRootProcedureId(procedures, proc);
-  }
-
   private void execCompletionCleanup(final Procedure proc) {
     final TEnvironment env = getEnvironment();
     if (proc.holdLock(env) && proc.hasLock(env)) {
@@ -1428,15 +1373,294 @@ public class ProcedureExecutor<TEnvironment> {
     sendProcedureFinishedNotification(proc.getProcId());
   }
 
-  public Pair<ProcedureInfo, Procedure> getResultOrProcedure(final long 
procId) {
-    ProcedureInfo result = completed.get(procId);
-    Procedure proc = null;
-    if (result == null) {
-      proc = procedures.get(procId);
-      if (proc == null) {
-        result = completed.get(procId);
+  // ==========================================================================
+  //  Worker Thread
+  // ==========================================================================
+  private final class WorkerThread extends StoppableThread {
+    private final AtomicLong executionStartTime = new 
AtomicLong(Long.MAX_VALUE);
+
+    public WorkerThread(final ThreadGroup group) {
+      super(group, "ProcedureExecutorWorker-" + workerId.incrementAndGet());
+    }
+
+    @Override
+    public void sendStopSignal() {
+      scheduler.signalAll();
+    }
+
+    @Override
+    public void run() {
+      final boolean isTraceEnabled = LOG.isTraceEnabled();
+      long lastUpdate = EnvironmentEdgeManager.currentTime();
+      while (isRunning() && keepAlive(lastUpdate)) {
+        final Procedure procedure = scheduler.poll(keepAliveTime, 
TimeUnit.MILLISECONDS);
+        if (procedure == null) continue;
+
+        activeExecutorCount.incrementAndGet();
+        executionStartTime.set(EnvironmentEdgeManager.currentTime());
+        try {
+          if (isTraceEnabled) {
+            LOG.trace("Trying to start the execution of " + procedure);
+          }
+          executeProcedure(procedure);
+        } finally {
+          activeExecutorCount.decrementAndGet();
+          lastUpdate = EnvironmentEdgeManager.currentTime();
+          executionStartTime.set(Long.MAX_VALUE);
+        }
       }
+      LOG.debug("worker thread terminated " + this);
+      workerThreads.remove(this);
+    }
+
+    /**
+     * @return the time since the current procedure is running
+     */
+    public long getCurrentRunTime() {
+      return EnvironmentEdgeManager.currentTime() - executionStartTime.get();
+    }
+
+    private boolean keepAlive(final long lastUpdate) {
+      if (workerThreads.size() <= corePoolSize) return true;
+      return (EnvironmentEdgeManager.currentTime() - lastUpdate) < 
keepAliveTime;
+    }
+  }
+
+  // ==========================================================================
+  //  Timeout Thread
+  // ==========================================================================
+  private final class TimeoutExecutorThread extends StoppableThread {
+    private final DelayQueue<DelayedWithTimeout> queue = new 
DelayQueue<DelayedWithTimeout>();
+
+    public TimeoutExecutorThread(final ThreadGroup group) {
+      super(group, "ProcedureTimeoutExecutor");
+    }
+
+    @Override
+    public void sendStopSignal() {
+      queue.add(DelayedUtil.DELAYED_POISON);
+    }
+
+    @Override
+    public void run() {
+      final boolean isTraceEnabled = LOG.isTraceEnabled();
+      while (isRunning()) {
+        final DelayedWithTimeout task = 
DelayedUtil.takeWithoutInterrupt(queue);
+        if (task == null || task == DelayedUtil.DELAYED_POISON) {
+          // the executor may be shutting down,
+          // and the task is just the shutdown request
+          continue;
+        }
+
+        if (isTraceEnabled) {
+          LOG.trace("Trying to start the execution of " + task);
+        }
+
+        // execute the task
+        if (task instanceof InlineChore) {
+          execInlineChore((InlineChore)task);
+        } else if (task instanceof DelayedProcedure) {
+          execDelayedProcedure((DelayedProcedure)task);
+        } else {
+          LOG.error("CODE-BUG unknown timeout task type " + task);
+        }
+      }
+    }
+
+    public void add(final InlineChore chore) {
+      chore.refreshTimeout();
+      queue.add(chore);
+    }
+
+    public void add(final Procedure procedure) {
+      assert procedure.getState() == ProcedureState.WAITING_TIMEOUT;
+      queue.add(new DelayedProcedure(procedure));
+    }
+
+    public boolean remove(final Procedure procedure) {
+      return queue.remove(new DelayedProcedure(procedure));
+    }
+
+    private void execInlineChore(final InlineChore chore) {
+      chore.run();
+      add(chore);
+    }
+
+    private void execDelayedProcedure(final DelayedProcedure delayed) {
+      // TODO: treat this as a normal procedure, add it to the scheduler and
+      // let one of the workers handle it.
+      // Today we consider ProcedureInMemoryChore as InlineChores
+      final Procedure procedure = delayed.getObject();
+      if (procedure instanceof ProcedureInMemoryChore) {
+        executeInMemoryChore((ProcedureInMemoryChore)procedure);
+        // if the procedure is in a waiting state again, put it back in the 
queue
+        procedure.updateTimestamp();
+        if (procedure.isWaiting()) {
+          delayed.setTimeoutTimestamp(procedure.getTimeoutTimestamp());
+          queue.add(delayed);
+        }
+      } else {
+        executeTimedoutProcedure(procedure);
+      }
+    }
+
+    private void executeInMemoryChore(final ProcedureInMemoryChore chore) {
+      if (!chore.isWaiting()) return;
+
+      // The ProcedureInMemoryChore is a special case, and it acts as a chore.
+      // instead of bringing the Chore class in, we reuse this timeout thread 
for
+      // this special case.
+      try {
+        chore.periodicExecute(getEnvironment());
+      } catch (Throwable e) {
+        LOG.error("Ignoring " + chore + " exception: " + e.getMessage(), e);
+      }
+    }
+
+    private void executeTimedoutProcedure(final Procedure proc) {
+      // The procedure received a timeout. if the procedure itself does not 
handle it,
+      // call abort() and add the procedure back in the queue for rollback.
+      if (proc.setTimeoutFailure(getEnvironment())) {
+        long rootProcId = Procedure.getRootProcedureId(procedures, proc);
+        RootProcedureState procStack = rollbackStack.get(rootProcId);
+        procStack.abort();
+        store.update(proc);
+        scheduler.addFront(proc);
+      }
+    }
+  }
+
+  private static final class DelayedProcedure
+      extends DelayedUtil.DelayedContainerWithTimestamp<Procedure> {
+    public DelayedProcedure(final Procedure procedure) {
+      super(procedure, procedure.getTimeoutTimestamp());
+    }
+  }
+
+  private static abstract class StoppableThread extends Thread {
+    public StoppableThread(final ThreadGroup group, final String name) {
+      super(group, name);
+    }
+
+    public abstract void sendStopSignal();
+
+    public void awaitTermination() {
+      try {
+        final long startTime = EnvironmentEdgeManager.currentTime();
+        for (int i = 0; isAlive(); ++i) {
+          sendStopSignal();
+          join(250);
+          if (i > 0 && (i % 8) == 0) {
+            LOG.warn("waiting termination of thread " + getName() + ", " +
+              StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - 
startTime));
+          }
+        }
+      } catch (InterruptedException e) {
+        LOG.warn(getName() + " join wait got interrupted", e);
+      }
+    }
+  }
+
+  // ==========================================================================
+  //  Inline Chores (executors internal chores)
+  // ==========================================================================
+  private static abstract class InlineChore extends DelayedUtil.DelayedObject 
implements Runnable {
+    private long timeout;
+
+    public abstract int getTimeoutInterval();
+
+    protected void refreshTimeout() {
+      this.timeout = EnvironmentEdgeManager.currentTime() + 
getTimeoutInterval();
+    }
+
+    @Override
+    public long getTimeoutTimestamp() {
+      return timeout;
+    }
+  }
+
+  // 
----------------------------------------------------------------------------
+  // TODO-MAYBE: Should we provide a InlineChore to notify the store with the
+  // full set of procedures pending and completed to write a compacted
+  // version of the log (in case is a log)?
+  // In theory no, procedures are have a short life, so at some point the store
+  // will have the tracker saying everything is in the last log.
+  // 
----------------------------------------------------------------------------
+
+  private final class WorkerMonitor extends InlineChore {
+    public static final String WORKER_MONITOR_INTERVAL_CONF_KEY =
+        "hbase.procedure.worker.monitor.interval.msec";
+    private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec
+
+    public static final String WORKER_STUCK_THRESHOLD_CONF_KEY =
+        "hbase.procedure.worker.stuck.threshold.msec";
+    private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec
+
+    public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY =
+        "hbase.procedure.worker.add.stuck.percentage";
+    private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 
50% stuck
+
+    private float addWorkerStuckPercentage = 
DEFAULT_WORKER_ADD_STUCK_PERCENTAGE;
+    private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL;
+    private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD;
+
+    public WorkerMonitor() {
+      refreshConfig();
+    }
+
+    @Override
+    public void run() {
+      final int stuckCount = checkForStuckWorkers();
+      checkThreadCount(stuckCount);
+
+      // refresh interval (poor man dynamic conf update)
+      refreshConfig();
+    }
+
+    private int checkForStuckWorkers() {
+      // check if any of the worker is stuck
+      int stuckCount = 0;
+      for (WorkerThread worker: workerThreads) {
+        if (worker.getCurrentRunTime() < stuckThreshold) {
+          continue;
+        }
+
+        // WARN the worker is stuck
+        stuckCount++;
+        LOG.warn("found worker stuck " + worker +
+            " run time " + 
StringUtils.humanTimeDiff(worker.getCurrentRunTime()));
+      }
+      return stuckCount;
+    }
+
+    private void checkThreadCount(final int stuckCount) {
+      // nothing to do if there are no runnable tasks
+      if (stuckCount < 1 || !scheduler.hasRunnables()) return;
+
+      // add a new thread if the worker stuck percentage exceed the threshold 
limit
+      // and every handler is active.
+      final float stuckPerc = ((float)stuckCount) / workerThreads.size();
+      if (stuckPerc >= addWorkerStuckPercentage &&
+          activeExecutorCount.get() == workerThreads.size()) {
+        final WorkerThread worker = new WorkerThread(threadGroup);
+        workerThreads.add(worker);
+        worker.start();
+        LOG.debug("added a new worker thread " + worker);
+      }
+    }
+
+    private void refreshConfig() {
+      addWorkerStuckPercentage = 
conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY,
+          DEFAULT_WORKER_ADD_STUCK_PERCENTAGE);
+      timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY,
+        DEFAULT_WORKER_MONITOR_INTERVAL);
+      stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY,
+        DEFAULT_WORKER_STUCK_THRESHOLD);
+    }
+
+    @Override
+    public int getTimeoutInterval() {
+      return timeoutInterval;
     }
-    return new Pair(result, proc);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e9dabe/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
new file mode 100644
index 0000000..ea34c49
--- /dev/null
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2.util;
+
+import java.util.Objects;
+import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class DelayedUtil {
+  private DelayedUtil() { }
+
+  public interface DelayedWithTimeout extends Delayed {
+    long getTimeoutTimestamp();
+  }
+
+  public static final DelayedWithTimeout DELAYED_POISON = new 
DelayedWithTimeout() {
+    @Override
+    public long getTimeoutTimestamp() {
+      return 0;
+    }
+
+    @Override
+    public long getDelay(final TimeUnit unit) {
+      return 0;
+    }
+
+    @Override
+    public int compareTo(final Delayed o) {
+      return Long.compare(0, DelayedUtil.getTimeoutTimestamp(o));
+    }
+
+    @Override
+    public boolean equals(final Object other) {
+      return this == other;
+    }
+
+    @Override
+    public String toString() {
+      return getClass().getSimpleName() + "(POISON)";
+    }
+  };
+
+  public static <E extends Delayed> E takeWithoutInterrupt(final DelayQueue<E> 
queue) {
+    try {
+      return queue.take();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return null;
+    }
+  }
+
+  public static long getRemainingTime(final TimeUnit resultUnit, final long 
timeoutTime) {
+    final long currentTime = EnvironmentEdgeManager.currentTime();
+    if (currentTime >= timeoutTime) {
+      return 0;
+    }
+    return resultUnit.convert(timeoutTime - currentTime, 
TimeUnit.MILLISECONDS);
+  }
+
+  public static int compareDelayed(final Delayed o1, final Delayed o2) {
+    return Long.compare(getTimeoutTimestamp(o1), getTimeoutTimestamp(o2));
+  }
+
+  private static long getTimeoutTimestamp(final Delayed o) {
+    assert o instanceof DelayedWithTimeout : "expected DelayedWithTimeout 
instance, got " + o;
+    return ((DelayedWithTimeout)o).getTimeoutTimestamp();
+  }
+
+  public static abstract class DelayedObject implements DelayedWithTimeout {
+    @Override
+    public long getDelay(final TimeUnit unit) {
+      return DelayedUtil.getRemainingTime(unit, getTimeoutTimestamp());
+    }
+
+    @Override
+    public int compareTo(final Delayed other) {
+      return DelayedUtil.compareDelayed(this, other);
+    }
+  }
+
+  public static abstract class DelayedContainer<T> extends DelayedObject {
+    private final T object;
+
+    public DelayedContainer(final T object) {
+      this.object = object;
+    }
+
+    public T getObject() {
+      return this.object;
+    }
+
+    @Override
+    public boolean equals(final Object other) {
+      if (other == this) return true;
+      if (!(other instanceof DelayedContainer)) return false;
+      return Objects.equals(getObject(), 
((DelayedContainer)other).getObject());
+    }
+
+    @Override
+    public int hashCode() {
+      return object != null ? object.hashCode() : 0;
+    }
+
+    @Override
+    public String toString() {
+      return getClass().getSimpleName() + "(" + getObject() + ")";
+    }
+  }
+
+  public static class DelayedContainerWithTimestamp<T> extends 
DelayedContainer<T> {
+    private long timeoutTimestamp;
+
+    public DelayedContainerWithTimestamp(final T object, final long 
timeoutTimestamp) {
+      super(object);
+      setTimeoutTimestamp(timeoutTimestamp);
+    }
+
+    @Override
+    public long getTimeoutTimestamp() {
+      return timeoutTimestamp;
+    }
+
+    public void setTimeoutTimestamp(final long timeoutTimestamp) {
+      this.timeoutTimestamp = timeoutTimestamp;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e9dabe/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
index f2c7e6b..0b4e4ed 100644
--- 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java
@@ -74,8 +74,8 @@ public class ProcedureTestingUtility {
   public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor,
       Runnable beforeStartAction, boolean failOnCorrupted) throws Exception {
     ProcedureStore procStore = procExecutor.getStore();
-    int storeThreads = procExecutor.getNumThreads();
-    int execThreads = procExecutor.getNumThreads();
+    int storeThreads = procExecutor.getCorePoolSize();
+    int execThreads = procExecutor.getCorePoolSize();
     // stop
     procExecutor.stop();
     procExecutor.join();

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e9dabe/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java
new file mode 100644
index 0000000..851dc3e
--- /dev/null
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.procedure2;
+
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
+import 
org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure;
+import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.util.Threads;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+
+@Category({MasterTests.class, SmallTests.class})
+public class TestProcedureExecutor {
+  private static final Log LOG = 
LogFactory.getLog(TestProcedureExecutor.class);
+
+  private TestProcEnv procEnv;
+  private NoopProcedureStore procStore;
+  private ProcedureExecutor<TestProcEnv> procExecutor;
+
+  private HBaseCommonTestingUtility htu;
+
+  @Before
+  public void setUp() throws Exception {
+    htu = new HBaseCommonTestingUtility();
+
+    // NOTE: The executor will be created by each test
+    procEnv = new TestProcEnv();
+    procStore = new NoopProcedureStore();
+    procStore.start(1);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    procExecutor.stop();
+    procStore.stop(false);
+    procExecutor.join();
+  }
+
+  private void createNewExecutor(final Configuration conf, final int 
numThreads) throws Exception {
+    procExecutor = new ProcedureExecutor(conf, procEnv, procStore);
+    procExecutor.start(numThreads, true);
+  }
+
+  @Test(timeout=60000)
+  public void testWorkerStuck() throws Exception {
+    // replace the executor
+    final Configuration conf = new Configuration(htu.getConfiguration());
+    conf.setFloat("hbase.procedure.worker.add.stuck.percentage", 0.5f);
+    conf.setInt("hbase.procedure.worker.monitor.interval.msec", 500);
+    conf.setInt("hbase.procedure.worker.stuck.threshold.msec", 750);
+
+    final int NUM_THREADS = 2;
+    createNewExecutor(conf, NUM_THREADS);
+
+    Semaphore latch1 = new Semaphore(2);
+    latch1.acquire(2);
+    BusyWaitProcedure busyProc1 = new BusyWaitProcedure(latch1);
+
+    Semaphore latch2 = new Semaphore(2);
+    latch2.acquire(2);
+    BusyWaitProcedure busyProc2 = new BusyWaitProcedure(latch2);
+
+    long busyProcId1 = procExecutor.submitProcedure(busyProc1);
+    long busyProcId2 = procExecutor.submitProcedure(busyProc2);
+    long otherProcId = procExecutor.submitProcedure(new NoopProcedure());
+
+    // wait until a new worker is being created
+    int threads1 = waitThreadCount(NUM_THREADS + 1);
+    LOG.info("new threads got created: " + (threads1 - NUM_THREADS));
+    assertEquals(NUM_THREADS + 1, threads1);
+
+    ProcedureTestingUtility.waitProcedure(procExecutor, otherProcId);
+    assertEquals(true, procExecutor.isFinished(otherProcId));
+    ProcedureTestingUtility.assertProcNotFailed(procExecutor, otherProcId);
+
+    assertEquals(true, procExecutor.isRunning());
+    assertEquals(false, procExecutor.isFinished(busyProcId1));
+    assertEquals(false, procExecutor.isFinished(busyProcId2));
+
+    // terminate the busy procedures
+    latch1.release();
+    latch2.release();
+
+    LOG.info("set keep alive and wait threads being removed");
+    procExecutor.setKeepAliveTime(500L, TimeUnit.MILLISECONDS);
+    int threads2 = waitThreadCount(NUM_THREADS);
+    LOG.info("threads got removed: " + (threads1 - threads2));
+    assertEquals(NUM_THREADS, threads2);
+
+    // terminate the busy procedures
+    latch1.release();
+    latch2.release();
+
+    // wait for all procs to complete
+    ProcedureTestingUtility.waitProcedure(procExecutor, busyProcId1);
+    ProcedureTestingUtility.waitProcedure(procExecutor, busyProcId2);
+    ProcedureTestingUtility.assertProcNotFailed(procExecutor, busyProcId1);
+    ProcedureTestingUtility.assertProcNotFailed(procExecutor, busyProcId2);
+  }
+
+  private int waitThreadCount(final int expectedThreads) {
+    while (procExecutor.isRunning()) {
+      if (procExecutor.getWorkerThreadCount() == expectedThreads) {
+        break;
+      }
+      LOG.debug("waiting for thread count=" + expectedThreads +
+        " current=" + procExecutor.getWorkerThreadCount());
+      Threads.sleepWithoutInterrupt(250);
+    }
+    return procExecutor.getWorkerThreadCount();
+  }
+
+  public static class BusyWaitProcedure extends NoopProcedure<TestProcEnv> {
+    private final Semaphore latch;
+
+    public BusyWaitProcedure(final Semaphore latch) {
+      this.latch = latch;
+    }
+
+    @Override
+    protected Procedure[] execute(final TestProcEnv env) {
+      try {
+        LOG.info("worker started " + this);
+        if (!latch.tryAcquire(1, 30, TimeUnit.SECONDS)) {
+          throw new Exception("waited too long");
+        }
+
+        LOG.info("worker step 2 " + this);
+        if (!latch.tryAcquire(1, 30, TimeUnit.SECONDS)) {
+          throw new Exception("waited too long");
+        }
+      } catch (Exception e) {
+        LOG.error("got unexpected exception", e);
+        setFailure("BusyWaitProcedure", e);
+      }
+      return null;
+    }
+  }
+
+  private class TestProcEnv { }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e9dabe/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java
----------------------------------------------------------------------
diff --git 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java
 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java
index 8bc8fa8..50ccfa6 100644
--- 
a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java
+++ 
b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java
@@ -76,17 +76,17 @@ public class TestProcedureInMemoryChore {
     CountDownLatch latch = new CountDownLatch(nCountDown);
     TestLatchChore chore = new TestLatchChore(timeoutMSec, latch);
     procExecutor.addChore(chore);
-    assertTrue(chore.isRunnable());
+    assertTrue(chore.isWaiting());
     latch.await();
 
     // remove the chore and verify it is no longer executed
-    assertTrue(chore.isRunnable());
+    assertTrue(chore.isWaiting());
     procExecutor.removeChore(chore);
     latch = new CountDownLatch(nCountDown);
     chore.setLatch(latch);
     latch.await(timeoutMSec * nCountDown, TimeUnit.MILLISECONDS);
     LOG.info("chore latch count=" + latch.getCount());
-    assertFalse(chore.isRunnable());
+    assertFalse(chore.isWaiting());
     assertTrue("latchCount=" + latch.getCount(), latch.getCount() > 0);
   }
 
@@ -104,6 +104,7 @@ public class TestProcedureInMemoryChore {
 
     @Override
     protected void periodicExecute(final TestProcEnv env) {
+      LOG.info("periodic execute " + this);
       latch.countDown();
     }
   }

Reply via email to