HBASE-16846 Procedure v2 - executor cleanup
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c6e9dabe Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c6e9dabe Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c6e9dabe Branch: refs/heads/master Commit: c6e9dabe625b910b3b140d9234710f6e8b968b37 Parents: c8e9a29 Author: Matteo Bertozzi <matteo.berto...@cloudera.com> Authored: Mon Oct 17 10:23:33 2016 -0700 Committer: Matteo Bertozzi <matteo.berto...@cloudera.com> Committed: Mon Oct 17 10:30:59 2016 -0700 ---------------------------------------------------------------------- .../procedure2/AbstractProcedureScheduler.java | 24 +- .../hadoop/hbase/procedure2/Procedure.java | 449 +++++---- .../hbase/procedure2/ProcedureExecutor.java | 996 ++++++++++++------- .../hbase/procedure2/util/DelayedUtil.java | 150 +++ .../procedure2/ProcedureTestingUtility.java | 4 +- .../hbase/procedure2/TestProcedureExecutor.java | 171 ++++ .../procedure2/TestProcedureInMemoryChore.java | 7 +- .../hbase/procedure2/util/TestDelayedUtil.java | 87 ++ .../org/apache/hadoop/hbase/master/HMaster.java | 2 + .../master/procedure/MasterProcedureEnv.java | 8 +- .../procedure/MasterProcedureScheduler.java | 8 +- ...ProcedureSchedulerPerformanceEvaluation.java | 18 +- .../procedure/TestMasterProcedureScheduler.java | 2 + ...TestMasterProcedureSchedulerConcurrency.java | 11 + 14 files changed, 1330 insertions(+), 607 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e9dabe/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java index c4ae877..dc94983 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/AbstractProcedureScheduler.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.procedure2; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; @@ -123,17 +122,26 @@ public abstract class AbstractProcedureScheduler implements ProcedureScheduler { return poll(unit.toNanos(timeout)); } - public Procedure poll(long nanos) { - final boolean waitForever = (nanos < 0); + @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") + public Procedure poll(final long nanos) { schedLock(); try { - while (!queueHasRunnables()) { - if (!running) return null; - if (waitForever) { + if (!running) { + LOG.debug("the scheduler is not running"); + return null; + } + + if (!queueHasRunnables()) { + // WA_AWAIT_NOT_IN_LOOP: we are not in a loop because we want the caller + // to take decisions after a wake/interruption. + if (nanos < 0) { schedWaitCond.await(); } else { - if (nanos <= 0) return null; - nanos = schedWaitCond.awaitNanos(nanos); + schedWaitCond.awaitNanos(nanos); + } + if (!queueHasRunnables()) { + nullPollCalls++; + return null; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e9dabe/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index eafb19a..19604e5 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -57,26 +57,30 @@ import com.google.common.annotations.VisibleForTesting; @InterfaceAudience.Private @InterfaceStability.Evolving public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { + protected static final long NO_PROC_ID = -1; + protected static final int NO_TIMEOUT = -1; + // unchanged after initialization + private NonceKey nonceKey = null; private String owner = null; - private Long parentProcId = null; - private Long procId = null; + private long parentProcId = NO_PROC_ID; + private long rootProcId = NO_PROC_ID; + private long procId = NO_PROC_ID; private long startTime; // runtime state, updated every operation private ProcedureState state = ProcedureState.INITIALIZING; - private Integer timeout = null; + private RemoteProcedureException exception = null; private int[] stackIndexes = null; private int childrenLatch = 0; - private long lastUpdate; - // TODO: it will be nice having pointers to allow the scheduler doing suspend/resume tricks - private boolean suspended = false; + private volatile int timeout = NO_TIMEOUT; + private volatile long lastUpdate; - private RemoteProcedureException exception = null; - private byte[] result = null; + private volatile byte[] result = null; - private NonceKey nonceKey = null; + // TODO: it will be nice having pointers to allow the scheduler doing suspend/resume tricks + private boolean suspended = false; /** * The main code of the procedure. It must be idempotent since execute() @@ -235,13 +239,11 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { * @return the StringBuilder */ protected StringBuilder toStringSimpleSB() { - StringBuilder sb = new StringBuilder(); + final StringBuilder sb = new StringBuilder(); toStringClassDetails(sb); - if (procId != null) { - sb.append(" id="); - sb.append(getProcId()); - } + sb.append(" id="); + sb.append(getProcId()); if (hasParent()) { sb.append(" parent="); @@ -256,6 +258,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { sb.append(" state="); toStringState(sb); + if (hasException()) { + sb.append(" failed=" + getException()); + } + return sb; } @@ -264,7 +270,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { * details */ public String toStringDetails() { - StringBuilder sb = toStringSimpleSB(); + final StringBuilder sb = toStringSimpleSB(); sb.append(" startTime="); sb.append(getStartTime()); @@ -272,7 +278,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { sb.append(" lastUpdate="); sb.append(getLastUpdate()); - int[] stackIndices = getStackIndexes(); + final int[] stackIndices = getStackIndexes(); if (stackIndices != null) { sb.append("\n"); sb.append("stackIndexes="); @@ -285,7 +291,6 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { protected String toStringClass() { StringBuilder sb = new StringBuilder(); toStringClassDetails(sb); - return sb.toString(); } @@ -309,101 +314,180 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { builder.append(getClass().getName()); } + // ========================================================================== + // Those fields are unchanged after initialization. + // + // Each procedure will get created from the user or during + // ProcedureExecutor.start() during the load() phase and then submitted + // to the executor. these fields will never be changed after initialization + // ========================================================================== + public long getProcId() { + return procId; + } + + public boolean hasParent() { + return parentProcId != NO_PROC_ID; + } + + public long getParentProcId() { + return parentProcId; + } + + public long getRootProcId() { + return rootProcId; + } + + public NonceKey getNonceKey() { + return nonceKey; + } + + public long getStartTime() { + return startTime; + } + + public String getOwner() { + return owner; + } + + public boolean hasOwner() { + return owner != null; + } + /** - * @return the serialized result if any, otherwise null + * Called by the ProcedureExecutor to assign the ID to the newly created procedure. */ - public byte[] getResult() { - return result; + @VisibleForTesting + @InterfaceAudience.Private + protected void setProcId(final long procId) { + this.procId = procId; + this.startTime = EnvironmentEdgeManager.currentTime(); + setState(ProcedureState.RUNNABLE); } /** - * The procedure may leave a "result" on completion. - * @param result the serialized result that will be passed to the client + * Called by the ProcedureExecutor to assign the parent to the newly created procedure. */ - protected void setResult(final byte[] result) { - this.result = result; + @InterfaceAudience.Private + protected void setParentProcId(final long parentProcId) { + this.parentProcId = parentProcId; } - public long getProcId() { - return procId; + @InterfaceAudience.Private + protected void setRootProcId(final long rootProcId) { + this.rootProcId = rootProcId; } - public boolean hasParent() { - return parentProcId != null; + /** + * Called by the ProcedureExecutor to set the value to the newly created procedure. + */ + @VisibleForTesting + @InterfaceAudience.Private + protected void setNonceKey(final NonceKey nonceKey) { + this.nonceKey = nonceKey; } - public boolean hasException() { - return exception != null; + @VisibleForTesting + @InterfaceAudience.Private + public void setOwner(final String owner) { + this.owner = StringUtils.isEmpty(owner) ? null : owner; + } + + /** + * Called on store load to initialize the Procedure internals after + * the creation/deserialization. + */ + @InterfaceAudience.Private + protected void setStartTime(final long startTime) { + this.startTime = startTime; + } + + // ========================================================================== + // runtime state - timeout related + // ========================================================================== + /** + * @param timeout timeout interval in msec + */ + protected void setTimeout(final int timeout) { + this.timeout = timeout; } public boolean hasTimeout() { - return timeout != null; + return timeout != NO_TIMEOUT; } - public long getParentProcId() { - return parentProcId.longValue(); + /** + * @return the timeout in msec + */ + public int getTimeout() { + return timeout; } - public NonceKey getNonceKey() { - return nonceKey; + /** + * Called on store load to initialize the Procedure internals after + * the creation/deserialization. + */ + @InterfaceAudience.Private + protected void setLastUpdate(final long lastUpdate) { + this.lastUpdate = lastUpdate; } /** - * @return true if the procedure is in a RUNNABLE state. + * Called by ProcedureExecutor after each time a procedure step is executed. */ - protected synchronized boolean isRunnable() { - return state == ProcedureState.RUNNABLE; + @InterfaceAudience.Private + protected void updateTimestamp() { + this.lastUpdate = EnvironmentEdgeManager.currentTime(); } - public synchronized boolean isInitializing() { - return state == ProcedureState.INITIALIZING; + public long getLastUpdate() { + return lastUpdate; } /** - * @return true if the procedure has failed. - * true may mean failed but not yet rolledback or failed and rolledback. + * Timeout of the next timeout. + * Called by the ProcedureExecutor if the procedure has timeout set and + * the procedure is in the waiting queue. + * @return the timestamp of the next timeout. */ - public synchronized boolean isFailed() { - return exception != null || state == ProcedureState.ROLLEDBACK; + @InterfaceAudience.Private + protected long getTimeoutTimestamp() { + return getLastUpdate() + getTimeout(); } + // ========================================================================== + // runtime state + // ========================================================================== /** - * @return true if the procedure is finished successfully. + * @return the time elapsed between the last update and the start time of the procedure. */ - public synchronized boolean isSuccess() { - return state == ProcedureState.FINISHED && exception == null; + public long elapsedTime() { + return getLastUpdate() - getStartTime(); } /** - * @return true if the procedure is finished. The Procedure may be completed - * successfuly or failed and rolledback. + * @return the serialized result if any, otherwise null */ - public synchronized boolean isFinished() { - switch (state) { - case ROLLEDBACK: - return true; - case FINISHED: - return exception == null; - default: - break; - } - return false; + public byte[] getResult() { + return result; } /** - * @return true if the procedure is waiting for a child to finish or for an external event. + * The procedure may leave a "result" on completion. + * @param result the serialized result that will be passed to the client */ - public synchronized boolean isWaiting() { - switch (state) { - case WAITING: - case WAITING_TIMEOUT: - return true; - default: - break; - } - return false; + protected void setResult(final byte[] result) { + this.result = result; } + // ============================================================================================== + // Runtime state, updated every operation by the ProcedureExecutor + // + // There is always 1 thread at the time operating on the state of the procedure. + // The ProcedureExecutor may check and set states, or some Procecedure may + // update its own state. but no concurrent updates. we use synchronized here + // just because the procedure can get scheduled on different executor threads on each step. + // ============================================================================================== + /** * @return true if the procedure is in a suspended state, * waiting for the resources required to execute the procedure will become available. @@ -421,55 +505,60 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { suspended = false; } - public synchronized RemoteProcedureException getException() { - return exception; - } - - public long getStartTime() { - return startTime; - } - - public synchronized long getLastUpdate() { - return lastUpdate; + /** + * @return true if the procedure is in a RUNNABLE state. + */ + protected synchronized boolean isRunnable() { + return state == ProcedureState.RUNNABLE; } - public synchronized long elapsedTime() { - return lastUpdate - startTime; + public synchronized boolean isInitializing() { + return state == ProcedureState.INITIALIZING; } /** - * @param timeout timeout in msec + * @return true if the procedure has failed. + * true may mean failed but not yet rolledback or failed and rolledback. */ - protected void setTimeout(final int timeout) { - this.timeout = timeout; + public synchronized boolean isFailed() { + return exception != null || state == ProcedureState.ROLLEDBACK; } /** - * @return the timeout in msec + * @return true if the procedure is finished successfully. */ - public int getTimeout() { - return timeout.intValue(); + public synchronized boolean isSuccess() { + return state == ProcedureState.FINISHED && exception == null; } /** - * @return the remaining time before the timeout + * @return true if the procedure is finished. The Procedure may be completed + * successfuly or failed and rolledback. */ - public long getTimeRemaining() { - return Math.max(0, timeout - (EnvironmentEdgeManager.currentTime() - startTime)); - } - - @VisibleForTesting - @InterfaceAudience.Private - public void setOwner(final String owner) { - this.owner = StringUtils.isEmpty(owner) ? null : owner; - } - - public String getOwner() { - return owner; + public synchronized boolean isFinished() { + switch (state) { + case ROLLEDBACK: + return true; + case FINISHED: + return exception == null; + default: + break; + } + return false; } - public boolean hasOwner() { - return owner != null; + /** + * @return true if the procedure is waiting for a child to finish or for an external event. + */ + public synchronized boolean isWaiting() { + switch (state) { + case WAITING: + case WAITING_TIMEOUT: + return true; + default: + break; + } + return false; } @VisibleForTesting @@ -514,101 +603,12 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { return false; } - /** - * Called by the ProcedureExecutor to assign the ID to the newly created procedure. - */ - @VisibleForTesting - @InterfaceAudience.Private - protected void setProcId(final long procId) { - this.procId = procId; - this.startTime = EnvironmentEdgeManager.currentTime(); - setState(ProcedureState.RUNNABLE); - } - - /** - * Called by the ProcedureExecutor to assign the parent to the newly created procedure. - */ - @InterfaceAudience.Private - protected void setParentProcId(final long parentProcId) { - this.parentProcId = parentProcId; - } - - /** - * Called by the ProcedureExecutor to set the value to the newly created procedure. - */ - @VisibleForTesting - @InterfaceAudience.Private - protected void setNonceKey(final NonceKey nonceKey) { - this.nonceKey = nonceKey; - } - - /** - * Internal method called by the ProcedureExecutor that starts the - * user-level code execute(). - */ - @InterfaceAudience.Private - protected Procedure[] doExecute(final TEnvironment env) - throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { - try { - updateTimestamp(); - return execute(env); - } finally { - updateTimestamp(); - } - } - - /** - * Internal method called by the ProcedureExecutor that starts the - * user-level code rollback(). - */ - @InterfaceAudience.Private - protected void doRollback(final TEnvironment env) - throws IOException, InterruptedException { - try { - updateTimestamp(); - rollback(env); - } finally { - updateTimestamp(); - } - } - - /** - * Internal method called by the ProcedureExecutor that starts the - * user-level code acquireLock(). - */ - @InterfaceAudience.Private - protected boolean doAcquireLock(final TEnvironment env) { - return acquireLock(env); - } - - /** - * Internal method called by the ProcedureExecutor that starts the - * user-level code releaseLock(). - */ - @InterfaceAudience.Private - protected void doReleaseLock(final TEnvironment env) { - releaseLock(env); - } - - /** - * Called on store load to initialize the Procedure internals after - * the creation/deserialization. - */ - @InterfaceAudience.Private - protected void setStartTime(final long startTime) { - this.startTime = startTime; - } - - /** - * Called on store load to initialize the Procedure internals after - * the creation/deserialization. - */ - protected synchronized void setLastUpdate(final long lastUpdate) { - this.lastUpdate = lastUpdate; + public synchronized boolean hasException() { + return exception != null; } - protected synchronized void updateTimestamp() { - this.lastUpdate = EnvironmentEdgeManager.currentTime(); + public synchronized RemoteProcedureException getException() { + return exception; } /** @@ -629,8 +629,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { } /** - * Called by the ProcedureExecutor to notify that one of the sub-procedures - * has completed. + * Called by the ProcedureExecutor to notify that one of the sub-procedures has completed. */ @InterfaceAudience.Private protected synchronized boolean childrenCountDown() { @@ -643,6 +642,7 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { return childrenLatch > 0; } + @InterfaceAudience.Private protected synchronized int getChildrenLatch() { return childrenLatch; } @@ -695,12 +695,63 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { return stackIndexes; } + // ========================================================================== + // Internal methods - called by the ProcedureExecutor + // ========================================================================== + + /** + * Internal method called by the ProcedureExecutor that starts the user-level code execute(). + */ + @InterfaceAudience.Private + protected Procedure[] doExecute(final TEnvironment env) + throws ProcedureYieldException, ProcedureSuspendedException, InterruptedException { + try { + updateTimestamp(); + return execute(env); + } finally { + updateTimestamp(); + } + } + + /** + * Internal method called by the ProcedureExecutor that starts the user-level code rollback(). + */ + @InterfaceAudience.Private + protected void doRollback(final TEnvironment env) + throws IOException, InterruptedException { + try { + updateTimestamp(); + rollback(env); + } finally { + updateTimestamp(); + } + } + + /** + * Internal method called by the ProcedureExecutor that starts the user-level code acquireLock(). + */ + @InterfaceAudience.Private + protected boolean doAcquireLock(final TEnvironment env) { + return acquireLock(env); + } + + /** + * Internal method called by the ProcedureExecutor that starts the user-level code releaseLock(). + */ + @InterfaceAudience.Private + protected void doReleaseLock(final TEnvironment env) { + releaseLock(env); + } + @Override public int compareTo(final Procedure other) { - long diff = getProcId() - other.getProcId(); - return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; + return Long.compare(getProcId(), other.getProcId()); } + // ========================================================================== + // misc utils + // ========================================================================== + /** * Get an hashcode for the specified Procedure ID * @return the hashcode for the specified procId http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e9dabe/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index 4976ea0..f167f4a 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -24,20 +24,16 @@ import com.google.common.base.Preconditions; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.DelayQueue; -import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; @@ -50,6 +46,8 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore.ProcedureIterator; +import org.apache.hadoop.hbase.procedure2.util.DelayedUtil; +import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout; import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState; import org.apache.hadoop.hbase.security.User; @@ -75,6 +73,13 @@ import org.apache.hadoop.hbase.util.Pair; public class ProcedureExecutor<TEnvironment> { private static final Log LOG = LogFactory.getLog(ProcedureExecutor.class); + public static final String CHECK_OWNER_SET_CONF_KEY = "hbase.procedure.check.owner.set"; + private static final boolean DEFAULT_CHECK_OWNER_SET = false; + + public static final String WORKER_KEEP_ALIVE_TIME_CONF_KEY = + "hbase.procedure.worker.keep.alive.time.msec"; + private static final long DEFAULT_WORKER_KEEP_ALIVE_TIME = Long.MAX_VALUE; + Testing testing = null; public static class Testing { protected boolean killBeforeStoreUpdate = false; @@ -97,62 +102,6 @@ public class ProcedureExecutor<TEnvironment> { } /** - * Used by the DelayQueue to get the timeout interval of the procedure - */ - private static class DelayedContainer implements Delayed { - static final DelayedContainer POISON = new DelayedContainer(); - - /** null if poison */ - final Procedure proc; - final long timeoutTime; - - DelayedContainer(Procedure proc) { - assert proc != null; - this.proc = proc; - this.timeoutTime = proc.getLastUpdate() + proc.getTimeout(); - } - - DelayedContainer() { - this.proc = null; - this.timeoutTime = Long.MIN_VALUE; - } - - @Override - public long getDelay(TimeUnit unit) { - long currentTime = EnvironmentEdgeManager.currentTime(); - if (currentTime >= timeoutTime) { - return 0; - } - return unit.convert(timeoutTime - currentTime, TimeUnit.MICROSECONDS); - } - - /** - * @throws NullPointerException {@inheritDoc} - * @throws ClassCastException {@inheritDoc} - */ - @Override - public int compareTo(Delayed o) { - return Long.compare(timeoutTime, ((DelayedContainer)o).timeoutTime); - } - - @Override - public boolean equals(Object obj) { - if (obj == this) { - return true; - } - if (! (obj instanceof DelayedContainer)) { - return false; - } - return Objects.equals(proc, ((DelayedContainer)obj).proc); - } - - @Override - public int hashCode() { - return proc != null ? proc.hashCode() : 0; - } - } - - /** * Internal cleaner that removes the completed procedure results after a TTL. * NOTE: This is a special case handled in timeoutLoop(). * @@ -186,7 +135,7 @@ public class ProcedureExecutor<TEnvironment> { private final Map<Long, ProcedureInfo> completed; private final Map<NonceKey, Long> nonceKeysToProcIdsMap; private final ProcedureStore store; - private final Configuration conf; + private Configuration conf; public CompletedProcedureCleaner(final Configuration conf, final ProcedureStore store, final Map<Long, ProcedureInfo> completedMap, @@ -274,39 +223,33 @@ public class ProcedureExecutor<TEnvironment> { * Helper map to lookup whether the procedure already issued from the same client. * This map contains every root procedure. */ - private ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap = + private final ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap = new ConcurrentHashMap<NonceKey, Long>(); - /** - * Timeout Queue that contains Procedures in a WAITING_TIMEOUT state - * or periodic procedures. - */ - private final DelayQueue<DelayedContainer> waitingTimeout = - new DelayQueue<DelayedContainer>(); + private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners = + new CopyOnWriteArrayList<ProcedureExecutorListener>(); + + private Configuration conf; + private ThreadGroup threadGroup; + private CopyOnWriteArrayList<WorkerThread> workerThreads; + private TimeoutExecutorThread timeoutExecutor; + private int corePoolSize; + + private volatile long keepAliveTime = Long.MAX_VALUE; /** * Scheduler/Queue that contains runnable procedures. */ private final ProcedureScheduler scheduler; - // TODO - private final ReentrantLock submitLock = new ReentrantLock(); private final AtomicLong lastProcId = new AtomicLong(-1); - - private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners = - new CopyOnWriteArrayList<ProcedureExecutorListener>(); - + private final AtomicLong workerId = new AtomicLong(0); private final AtomicInteger activeExecutorCount = new AtomicInteger(0); private final AtomicBoolean running = new AtomicBoolean(false); private final TEnvironment environment; private final ProcedureStore store; - private final Configuration conf; - - private static final String CHECK_OWNER_SET_CONF_KEY = "hbase.procedure.check.owner.set"; private final boolean checkOwnerSet; - private Thread[] threads; - public ProcedureExecutor(final Configuration conf, final TEnvironment environment, final ProcedureStore store) { this(conf, environment, store, new SimpleProcedureScheduler()); @@ -318,15 +261,15 @@ public class ProcedureExecutor<TEnvironment> { this.scheduler = scheduler; this.store = store; this.conf = conf; - this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, true); + this.checkOwnerSet = conf.getBoolean(CHECK_OWNER_SET_CONF_KEY, DEFAULT_CHECK_OWNER_SET); + refreshConfiguration(conf); } private void load(final boolean abortOnCorruption) throws IOException { - Preconditions.checkArgument(completed.isEmpty()); - Preconditions.checkArgument(rollbackStack.isEmpty()); - Preconditions.checkArgument(procedures.isEmpty()); - Preconditions.checkArgument(waitingTimeout.isEmpty()); - Preconditions.checkArgument(scheduler.size() == 0); + Preconditions.checkArgument(completed.isEmpty(), "completed not empty"); + Preconditions.checkArgument(rollbackStack.isEmpty(), "rollback state not empty"); + Preconditions.checkArgument(procedures.isEmpty(), "procedure map not empty"); + Preconditions.checkArgument(scheduler.size() == 0, "run queue not empty"); store.load(new ProcedureStore.ProcedureLoader() { @Override @@ -435,6 +378,7 @@ public class ProcedureExecutor<TEnvironment> { RootProcedureState procStack = rollbackStack.get(rootProcId); procStack.loadStack(proc); + proc.setRootProcId(rootProcId); switch (proc.getState()) { case RUNNABLE: runnableList.add(proc); @@ -526,26 +470,21 @@ public class ProcedureExecutor<TEnvironment> { // We have numThreads executor + one timer thread used for timing out // procedures and triggering periodic procedures. - threads = new Thread[numThreads + 1]; - LOG.info("Starting procedure executor threads=" + threads.length); - - // Initialize procedures executor - for (int i = 0; i < numThreads; ++i) { - threads[i] = new Thread("ProcedureExecutor-" + i) { - @Override - public void run() { - execLoop(); - } - }; - } + this.corePoolSize = numThreads; + LOG.info("Starting procedure executor threads=" + corePoolSize); - // Initialize procedures timeout handler (this is the +1 thread) - threads[numThreads] = new Thread("ProcedureExecutorTimeoutThread") { - @Override - public void run() { - timeoutLoop(); - } - }; + // Create the Thread Group for the executors + threadGroup = new ThreadGroup("ProcedureExecutor"); + + // Create the timeout executor + timeoutExecutor = new TimeoutExecutorThread(threadGroup); + + // Create the workers + workerId.set(0); + workerThreads = new CopyOnWriteArrayList<WorkerThread>(); + for (int i = 0; i < corePoolSize; ++i) { + workerThreads.add(new WorkerThread(threadGroup)); + } long st, et; @@ -571,10 +510,15 @@ public class ProcedureExecutor<TEnvironment> { store.getClass().getSimpleName(), StringUtils.humanTimeDiff(et - st))); // Start the executors. Here we must have the lastProcId set. - for (int i = 0; i < threads.length; ++i) { - threads[i].start(); + LOG.debug("start workers " + workerThreads.size()); + timeoutExecutor.start(); + for (WorkerThread worker: workerThreads) { + worker.start(); } + // Internal chores + timeoutExecutor.add(new WorkerMonitor()); + // Add completed cleaner chore addChore(new CompletedProcedureCleaner(conf, store, completed, nonceKeysToProcIdsMap)); } @@ -586,42 +530,66 @@ public class ProcedureExecutor<TEnvironment> { LOG.info("Stopping the procedure executor"); scheduler.stop(); - waitingTimeout.add(DelayedContainer.POISON); + timeoutExecutor.sendStopSignal(); } public void join() { - boolean interrupted = false; + assert !isRunning() : "expected not running"; - for (int i = 0; i < threads.length; ++i) { - try { - threads[i].join(); - } catch (InterruptedException ex) { - interrupted = true; - } + // stop the timeout executor + timeoutExecutor.awaitTermination(); + timeoutExecutor = null; + + // stop the worker threads + for (WorkerThread worker: workerThreads) { + worker.awaitTermination(); } + workerThreads = null; - if (interrupted) { - Thread.currentThread().interrupt(); + // Destroy the Thread Group for the executors + try { + threadGroup.destroy(); + } catch (IllegalThreadStateException e) { + LOG.error("thread group " + threadGroup + " contains running threads"); + threadGroup.list(); + } finally { + threadGroup = null; } + // reset the in-memory state for testing completed.clear(); rollbackStack.clear(); procedures.clear(); nonceKeysToProcIdsMap.clear(); - waitingTimeout.clear(); scheduler.clear(); lastProcId.set(-1); } + public void refreshConfiguration(final Configuration conf) { + this.conf = conf; + setKeepAliveTime(conf.getLong(WORKER_KEEP_ALIVE_TIME_CONF_KEY, + DEFAULT_WORKER_KEEP_ALIVE_TIME), TimeUnit.MILLISECONDS); + } + + // ========================================================================== + // Accessors + // ========================================================================== public boolean isRunning() { return running.get(); } /** - * @return the number of execution threads. + * @return the current number of worker threads. + */ + public int getWorkerThreadCount() { + return workerThreads.size(); + } + + /** + * @return the core pool size settings. */ - public int getNumThreads() { - return threads == null ? 0 : (threads.length - 1); + public int getCorePoolSize() { + return corePoolSize; } public int getActiveExecutorCount() { @@ -636,41 +604,30 @@ public class ProcedureExecutor<TEnvironment> { return this.store; } - public void registerListener(ProcedureExecutorListener listener) { - this.listeners.add(listener); + protected ProcedureScheduler getScheduler() { + return scheduler; } - public boolean unregisterListener(ProcedureExecutorListener listener) { - return this.listeners.remove(listener); + public void setKeepAliveTime(final long keepAliveTime, final TimeUnit timeUnit) { + this.keepAliveTime = timeUnit.toMillis(keepAliveTime); + this.scheduler.signalAll(); } - /** - * List procedures. - * @return the procedures in a list - */ - public List<ProcedureInfo> listProcedures() { - final List<ProcedureInfo> procedureLists = - new ArrayList<ProcedureInfo>(procedures.size() + completed.size()); - for (Map.Entry<Long, Procedure> p: procedures.entrySet()) { - procedureLists.add(ProcedureUtil.convertToProcedureInfo(p.getValue())); - } - for (Map.Entry<Long, ProcedureInfo> e: completed.entrySet()) { - // Note: The procedure could show up twice in the list with different state, as - // it could complete after we walk through procedures list and insert into - // procedureList - it is ok, as we will use the information in the ProcedureInfo - // to figure it out; to prevent this would increase the complexity of the logic. - procedureLists.add(e.getValue()); - } - return procedureLists; + public long getKeepAliveTime(final TimeUnit timeUnit) { + return timeUnit.convert(keepAliveTime, TimeUnit.MILLISECONDS); } + // ========================================================================== + // Submit/Remove Chores + // ========================================================================== + /** * Add a chore procedure to the executor * @param chore the chore to add */ public void addChore(final ProcedureInMemoryChore chore) { - chore.setState(ProcedureState.RUNNABLE); - waitingTimeout.add(new DelayedContainer(chore)); + chore.setState(ProcedureState.WAITING_TIMEOUT); + timeoutExecutor.add(chore); } /** @@ -680,9 +637,13 @@ public class ProcedureExecutor<TEnvironment> { */ public boolean removeChore(final ProcedureInMemoryChore chore) { chore.setState(ProcedureState.FINISHED); - return waitingTimeout.remove(new DelayedContainer(chore)); + return timeoutExecutor.remove(chore); } + // ========================================================================== + // Submit/Abort Procedure + // ========================================================================== + /** * Add a new root-procedure to the executor. * @param proc the new procedure to execute. @@ -699,20 +660,17 @@ public class ProcedureExecutor<TEnvironment> { * @param nonce * @return the procedure id, that can be used to monitor the operation */ - public long submitProcedure( - final Procedure proc, - final long nonceGroup, - final long nonce) { - Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING); - Preconditions.checkArgument(isRunning()); + public long submitProcedure(final Procedure proc, final long nonceGroup, final long nonce) { Preconditions.checkArgument(lastProcId.get() >= 0); - Preconditions.checkArgument(!proc.hasParent()); + Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING); + Preconditions.checkArgument(isRunning(), "executor not running"); + Preconditions.checkArgument(!proc.hasParent(), "unexpected parent", proc); if (this.checkOwnerSet) { - Preconditions.checkArgument(proc.hasOwner()); + Preconditions.checkArgument(proc.hasOwner(), "missing owner"); } // Initialize the Procedure ID - long currentProcId = nextProcId(); + final long currentProcId = nextProcId(); proc.setProcId(currentProcId); // Check whether the proc exists. If exist, just return the proc id. @@ -747,6 +705,41 @@ public class ProcedureExecutor<TEnvironment> { return currentProcId; } + /** + * Send an abort notification the specified procedure. + * Depending on the procedure implementation the abort can be considered or ignored. + * @param procId the procedure to abort + * @return true if the procedure exist and has received the abort, otherwise false. + */ + public boolean abort(final long procId) { + return abort(procId, true); + } + + /** + * Send an abort notification the specified procedure. + * Depending on the procedure implementation the abort can be considered or ignored. + * @param procId the procedure to abort + * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted? + * @return true if the procedure exist and has received the abort, otherwise false. + */ + public boolean abort(final long procId, final boolean mayInterruptIfRunning) { + final Procedure proc = procedures.get(procId); + if (proc != null) { + if (!mayInterruptIfRunning && proc.wasExecuted()) { + return false; + } + return proc.abort(getEnvironment()); + } + return false; + } + + // ========================================================================== + // Executor query helpers + // ========================================================================== + public Procedure getProcedure(final long procId) { + return procedures.get(procId); + } + public ProcedureInfo getResult(final long procId) { return completed.get(procId); } @@ -768,7 +761,7 @@ public class ProcedureExecutor<TEnvironment> { * @return true if the procedure execution is started, otherwise false. */ public boolean isStarted(final long procId) { - Procedure proc = procedures.get(procId); + final Procedure proc = procedures.get(procId); if (proc == null) { return completed.get(procId) != null; } @@ -780,7 +773,7 @@ public class ProcedureExecutor<TEnvironment> { * @param procId the ID of the procedure to remove */ public void removeResult(final long procId) { - ProcedureInfo result = completed.get(procId); + final ProcedureInfo result = completed.get(procId); if (result == null) { assert !procedures.containsKey(procId) : "procId=" + procId + " is still running"; if (LOG.isDebugEnabled()) { @@ -793,33 +786,16 @@ public class ProcedureExecutor<TEnvironment> { result.setClientAckTime(EnvironmentEdgeManager.currentTime()); } - /** - * Send an abort notification the specified procedure. - * Depending on the procedure implementation the abort can be considered or ignored. - * @param procId the procedure to abort - * @return true if the procedure exist and has received the abort, otherwise false. - */ - public boolean abort(final long procId) { - return abort(procId, true); - } - - /** - * Send an abort notification the specified procedure. - * Depending on the procedure implementation the abort can be considered or ignored. - * @param procId the procedure to abort - * @param mayInterruptIfRunning if the proc completed at least one step, should it be aborted? - * @return true if the procedure exist and has received the abort, otherwise false. - */ - public boolean abort(final long procId, final boolean mayInterruptIfRunning) { - Procedure proc = procedures.get(procId); - if (proc != null) { - if (!mayInterruptIfRunning && proc.wasExecuted()) { - return false; - } else { - return proc.abort(getEnvironment()); + public Pair<ProcedureInfo, Procedure> getResultOrProcedure(final long procId) { + ProcedureInfo result = completed.get(procId); + Procedure proc = null; + if (result == null) { + proc = procedures.get(procId); + if (proc == null) { + result = completed.get(procId); } } - return false; + return new Pair(result, proc); } /** @@ -830,15 +806,14 @@ public class ProcedureExecutor<TEnvironment> { * false otherwise or the owner is unknown. */ public boolean isProcedureOwner(final long procId, final User user) { - if (user == null) { - return false; - } + if (user == null) return false; - Procedure proc = procedures.get(procId); + final Procedure proc = procedures.get(procId); if (proc != null) { return proc.getOwner().equals(user.getShortName()); } - ProcedureInfo procInfo = completed.get(procId); + + final ProcedureInfo procInfo = completed.get(procId); if (procInfo == null) { // Procedure either does not exist or has already completed and got cleaned up. // At this time, we cannot check the owner of the procedure @@ -847,50 +822,113 @@ public class ProcedureExecutor<TEnvironment> { return ProcedureInfo.isProcedureOwner(procInfo, user); } - public Map<Long, ProcedureInfo> getResults() { - return Collections.unmodifiableMap(completed); + /** + * List procedures. + * @return the procedures in a list + */ + public List<ProcedureInfo> listProcedures() { + final List<ProcedureInfo> procedureLists = + new ArrayList<ProcedureInfo>(procedures.size() + completed.size()); + for (Map.Entry<Long, Procedure> p: procedures.entrySet()) { + procedureLists.add(ProcedureUtil.convertToProcedureInfo(p.getValue())); + } + for (Map.Entry<Long, ProcedureInfo> e: completed.entrySet()) { + // Note: The procedure could show up twice in the list with different state, as + // it could complete after we walk through procedures list and insert into + // procedureList - it is ok, as we will use the information in the ProcedureInfo + // to figure it out; to prevent this would increase the complexity of the logic. + procedureLists.add(e.getValue()); + } + return procedureLists; } - public Procedure getProcedure(final long procId) { - return procedures.get(procId); + // ========================================================================== + // Listeners helpers + // ========================================================================== + public void registerListener(ProcedureExecutorListener listener) { + this.listeners.add(listener); } - protected ProcedureScheduler getScheduler() { - return scheduler; + public boolean unregisterListener(ProcedureExecutorListener listener) { + return this.listeners.remove(listener); } - /** - * Execution loop (N threads) - * while the executor is in a running state, - * fetch a procedure from the scheduler queue and start the execution. - */ - private void execLoop() { - while (isRunning()) { - Procedure proc = scheduler.poll(); - if (proc == null) continue; + private void sendProcedureLoadedNotification(final long procId) { + if (!this.listeners.isEmpty()) { + for (ProcedureExecutorListener listener: this.listeners) { + try { + listener.procedureLoaded(procId); + } catch (Throwable e) { + LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e); + } + } + } + } - try { - activeExecutorCount.incrementAndGet(); - execLoop(proc); - } finally { - activeExecutorCount.decrementAndGet(); + private void sendProcedureAddedNotification(final long procId) { + if (!this.listeners.isEmpty()) { + for (ProcedureExecutorListener listener: this.listeners) { + try { + listener.procedureAdded(procId); + } catch (Throwable e) { + LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e); + } } } } - private void execLoop(Procedure proc) { - if (LOG.isTraceEnabled()) { - LOG.trace("Trying to start the execution of " + proc); + private void sendProcedureFinishedNotification(final long procId) { + if (!this.listeners.isEmpty()) { + for (ProcedureExecutorListener listener: this.listeners) { + try { + listener.procedureFinished(procId); + } catch (Throwable e) { + LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e); + } + } + } + } + + // ========================================================================== + // Procedure IDs helpers + // ========================================================================== + private long nextProcId() { + long procId = lastProcId.incrementAndGet(); + if (procId < 0) { + while (!lastProcId.compareAndSet(procId, 0)) { + procId = lastProcId.get(); + if (procId >= 0) + break; + } + while (procedures.containsKey(procId)) { + procId = lastProcId.incrementAndGet(); + } } + assert procId >= 0 : "Invalid procId " + procId; + return procId; + } + + @VisibleForTesting + protected long getLastProcId() { + return lastProcId.get(); + } - Long rootProcId = getRootProcedureId(proc); + private Long getRootProcedureId(Procedure proc) { + return Procedure.getRootProcedureId(procedures, proc); + } + + // ========================================================================== + // Executions + // ========================================================================== + private void executeProcedure(final Procedure proc) { + final Long rootProcId = getRootProcedureId(proc); if (rootProcId == null) { // The 'proc' was ready to run but the root procedure was rolledback executeRollback(proc); return; } - RootProcedureState procStack = rollbackStack.get(rootProcId); + final RootProcedureState procStack = rollbackStack.get(rootProcId); if (procStack == null) return; do { @@ -967,56 +1005,6 @@ public class ProcedureExecutor<TEnvironment> { } } - private void timeoutLoop() { - while (isRunning()) { - Procedure proc; - try { - proc = waitingTimeout.take().proc; - } catch (InterruptedException e) { - // Just consume the interruption. - continue; - } - if (proc == null) { // POISON to stop - break; - } - - // ---------------------------------------------------------------------------- - // TODO-MAYBE: Should we provide a notification to the store with the - // full set of procedures pending and completed to write a compacted - // version of the log (in case is a log)? - // In theory no, procedures are have a short life, so at some point the store - // will have the tracker saying everything is in the last log. - // ---------------------------------------------------------------------------- - - // The ProcedureInMemoryChore is a special case, and it acts as a chore. - // instead of bringing the Chore class in, we reuse this timeout thread for - // this special case. - if (proc instanceof ProcedureInMemoryChore) { - if (proc.isRunnable()) { - try { - ((ProcedureInMemoryChore)proc).periodicExecute(getEnvironment()); - } catch (Throwable e) { - LOG.error("Ignoring CompletedProcedureCleaner exception: " + e.getMessage(), e); - } - proc.updateTimestamp(); - if (proc.isRunnable()) waitingTimeout.add(new DelayedContainer(proc)); - } - continue; - } - - // The procedure received a timeout. if the procedure itself does not handle it, - // call abort() and add the procedure back in the queue for rollback. - if (proc.setTimeoutFailure(getEnvironment())) { - long rootProcId = Procedure.getRootProcedureId(procedures, proc); - RootProcedureState procStack = rollbackStack.get(rootProcId); - procStack.abort(); - store.update(proc); - scheduler.addFront(proc); - continue; - } - } - } - /** * Execute the rollback of the full procedure stack. * Once the procedure is rolledback, the root-procedure will be visible as @@ -1189,38 +1177,10 @@ public class ProcedureExecutor<TEnvironment> { reExecute = true; } else { // yield the current procedure, and make the subprocedure runnable - for (int i = 0; i < subprocs.length; ++i) { - Procedure subproc = subprocs[i]; - if (subproc == null) { - String msg = "subproc[" + i + "] is null, aborting the procedure"; - procedure.setFailure(new RemoteProcedureException(msg, - new IllegalArgumentIOException(msg))); - subprocs = null; - break; - } - - assert subproc.getState() == ProcedureState.INITIALIZING : subproc; - subproc.setParentProcId(procedure.getProcId()); - subproc.setProcId(nextProcId()); - procStack.addSubProcedure(subproc); - } - - if (!procedure.isFailed()) { - procedure.setChildrenLatch(subprocs.length); - switch (procedure.getState()) { - case RUNNABLE: - procedure.setState(ProcedureState.WAITING); - break; - case WAITING_TIMEOUT: - waitingTimeout.add(new DelayedContainer(procedure)); - break; - default: - break; - } - } + subprocs = initializeChildren(procStack, procedure, subprocs); } } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { - waitingTimeout.add(new DelayedContainer(procedure)); + timeoutExecutor.add(procedure); } else if (!isSuspended) { // No subtask, so we are done procedure.setState(ProcedureState.FINISHED); @@ -1232,8 +1192,8 @@ public class ProcedureExecutor<TEnvironment> { // allows to kill the executor before something is stored to the wal. // useful to test the procedure recovery. - if (testing != null && testing.shouldKillBeforeStoreUpdate()) { - LOG.debug("TESTING: Kill before store update"); + if (testing != null && !isSuspended && testing.shouldKillBeforeStoreUpdate()) { + LOG.debug("TESTING: Kill before store update: " + procedure); stop(); return; } @@ -1242,12 +1202,10 @@ public class ProcedureExecutor<TEnvironment> { updateStoreOnExec(procStack, procedure, subprocs); // if the store is not running we are aborting - if (!store.isRunning()) { - return; - } + if (!store.isRunning()) return; // if the procedure is kind enough to pass the slot to someone else, yield - if (procedure.getState() == ProcedureState.RUNNABLE && + if (procedure.isRunnable() && !isSuspended && procedure.isYieldAfterExecutionStep(getEnvironment())) { scheduler.yield(procedure); return; @@ -1258,34 +1216,81 @@ public class ProcedureExecutor<TEnvironment> { // Submit the new subprocedures if (subprocs != null && !procedure.isFailed()) { - for (int i = 0; i < subprocs.length; ++i) { - Procedure subproc = subprocs[i]; - assert !procedures.containsKey(subproc.getProcId()); - procedures.put(subproc.getProcId(), subproc); - scheduler.addFront(subproc); - } + submitChildrenProcedures(subprocs); } + // if the procedure is complete and has a parent, count down the children latch if (procedure.isFinished() && procedure.hasParent()) { - Procedure parent = procedures.get(procedure.getParentProcId()); - if (parent == null) { - assert procStack.isRollingback(); - return; + countDownChildren(procStack, procedure); + } + } + + private Procedure[] initializeChildren(final RootProcedureState procStack, + final Procedure procedure, final Procedure[] subprocs) { + assert subprocs != null : "expected subprocedures"; + final long rootProcId = getRootProcedureId(procedure); + for (int i = 0; i < subprocs.length; ++i) { + final Procedure subproc = subprocs[i]; + if (subproc == null) { + String msg = "subproc[" + i + "] is null, aborting the procedure"; + procedure.setFailure(new RemoteProcedureException(msg, + new IllegalArgumentIOException(msg))); + return null; } - // If this procedure is the last child awake the parent procedure - if (LOG.isTraceEnabled()) { - LOG.trace(parent + " child is done: " + procedure); + assert subproc.getState() == ProcedureState.INITIALIZING : subproc; + subproc.setParentProcId(procedure.getProcId()); + subproc.setRootProcId(rootProcId); + subproc.setProcId(nextProcId()); + procStack.addSubProcedure(subproc); + } + + if (!procedure.isFailed()) { + procedure.setChildrenLatch(subprocs.length); + switch (procedure.getState()) { + case RUNNABLE: + procedure.setState(ProcedureState.WAITING); + break; + case WAITING_TIMEOUT: + timeoutExecutor.add(procedure); + break; + default: + break; } - if (parent.childrenCountDown() && parent.getState() == ProcedureState.WAITING) { - parent.setState(ProcedureState.RUNNABLE); - store.update(parent); - scheduler.addFront(parent); - if (LOG.isTraceEnabled()) { - LOG.trace(parent + " all the children finished their work, resume."); - } - return; + } + return subprocs; + } + + private void submitChildrenProcedures(final Procedure[] subprocs) { + for (int i = 0; i < subprocs.length; ++i) { + final Procedure subproc = subprocs[i]; + assert !procedures.containsKey(subproc.getProcId()); + procedures.put(subproc.getProcId(), subproc); + scheduler.addFront(subproc); + } + } + + private void countDownChildren(final RootProcedureState procStack, final Procedure procedure) { + final Procedure parent = procedures.get(procedure.getParentProcId()); + if (parent == null) { + assert procStack.isRollingback(); + return; + } + + // If this procedure is the last child awake the parent procedure + final boolean isTraceEnabled = LOG.isTraceEnabled(); + if (isTraceEnabled) { + LOG.trace(parent + " child is done: " + procedure); + } + + if (parent.childrenCountDown() && parent.getState() == ProcedureState.WAITING) { + parent.setState(ProcedureState.RUNNABLE); + store.update(parent); + scheduler.addFront(parent); + if (isTraceEnabled) { + LOG.trace(parent + " all the children finished their work, resume."); } + return; } } @@ -1329,66 +1334,6 @@ public class ProcedureExecutor<TEnvironment> { // (The interrupted procedure will be retried on the next run) } - private void sendProcedureLoadedNotification(final long procId) { - if (!this.listeners.isEmpty()) { - for (ProcedureExecutorListener listener: this.listeners) { - try { - listener.procedureLoaded(procId); - } catch (Throwable e) { - LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e); - } - } - } - } - - private void sendProcedureAddedNotification(final long procId) { - if (!this.listeners.isEmpty()) { - for (ProcedureExecutorListener listener: this.listeners) { - try { - listener.procedureAdded(procId); - } catch (Throwable e) { - LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e); - } - } - } - } - - private void sendProcedureFinishedNotification(final long procId) { - if (!this.listeners.isEmpty()) { - for (ProcedureExecutorListener listener: this.listeners) { - try { - listener.procedureFinished(procId); - } catch (Throwable e) { - LOG.error("The listener " + listener + " had an error: " + e.getMessage(), e); - } - } - } - } - - private long nextProcId() { - long procId = lastProcId.incrementAndGet(); - if (procId < 0) { - while (!lastProcId.compareAndSet(procId, 0)) { - procId = lastProcId.get(); - if (procId >= 0) - break; - } - while (procedures.containsKey(procId)) { - procId = lastProcId.incrementAndGet(); - } - } - return procId; - } - - @VisibleForTesting - protected long getLastProcId() { - return lastProcId.get(); - } - - private Long getRootProcedureId(Procedure proc) { - return Procedure.getRootProcedureId(procedures, proc); - } - private void execCompletionCleanup(final Procedure proc) { final TEnvironment env = getEnvironment(); if (proc.holdLock(env) && proc.hasLock(env)) { @@ -1428,15 +1373,294 @@ public class ProcedureExecutor<TEnvironment> { sendProcedureFinishedNotification(proc.getProcId()); } - public Pair<ProcedureInfo, Procedure> getResultOrProcedure(final long procId) { - ProcedureInfo result = completed.get(procId); - Procedure proc = null; - if (result == null) { - proc = procedures.get(procId); - if (proc == null) { - result = completed.get(procId); + // ========================================================================== + // Worker Thread + // ========================================================================== + private final class WorkerThread extends StoppableThread { + private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE); + + public WorkerThread(final ThreadGroup group) { + super(group, "ProcedureExecutorWorker-" + workerId.incrementAndGet()); + } + + @Override + public void sendStopSignal() { + scheduler.signalAll(); + } + + @Override + public void run() { + final boolean isTraceEnabled = LOG.isTraceEnabled(); + long lastUpdate = EnvironmentEdgeManager.currentTime(); + while (isRunning() && keepAlive(lastUpdate)) { + final Procedure procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); + if (procedure == null) continue; + + activeExecutorCount.incrementAndGet(); + executionStartTime.set(EnvironmentEdgeManager.currentTime()); + try { + if (isTraceEnabled) { + LOG.trace("Trying to start the execution of " + procedure); + } + executeProcedure(procedure); + } finally { + activeExecutorCount.decrementAndGet(); + lastUpdate = EnvironmentEdgeManager.currentTime(); + executionStartTime.set(Long.MAX_VALUE); + } } + LOG.debug("worker thread terminated " + this); + workerThreads.remove(this); + } + + /** + * @return the time since the current procedure is running + */ + public long getCurrentRunTime() { + return EnvironmentEdgeManager.currentTime() - executionStartTime.get(); + } + + private boolean keepAlive(final long lastUpdate) { + if (workerThreads.size() <= corePoolSize) return true; + return (EnvironmentEdgeManager.currentTime() - lastUpdate) < keepAliveTime; + } + } + + // ========================================================================== + // Timeout Thread + // ========================================================================== + private final class TimeoutExecutorThread extends StoppableThread { + private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<DelayedWithTimeout>(); + + public TimeoutExecutorThread(final ThreadGroup group) { + super(group, "ProcedureTimeoutExecutor"); + } + + @Override + public void sendStopSignal() { + queue.add(DelayedUtil.DELAYED_POISON); + } + + @Override + public void run() { + final boolean isTraceEnabled = LOG.isTraceEnabled(); + while (isRunning()) { + final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue); + if (task == null || task == DelayedUtil.DELAYED_POISON) { + // the executor may be shutting down, + // and the task is just the shutdown request + continue; + } + + if (isTraceEnabled) { + LOG.trace("Trying to start the execution of " + task); + } + + // execute the task + if (task instanceof InlineChore) { + execInlineChore((InlineChore)task); + } else if (task instanceof DelayedProcedure) { + execDelayedProcedure((DelayedProcedure)task); + } else { + LOG.error("CODE-BUG unknown timeout task type " + task); + } + } + } + + public void add(final InlineChore chore) { + chore.refreshTimeout(); + queue.add(chore); + } + + public void add(final Procedure procedure) { + assert procedure.getState() == ProcedureState.WAITING_TIMEOUT; + queue.add(new DelayedProcedure(procedure)); + } + + public boolean remove(final Procedure procedure) { + return queue.remove(new DelayedProcedure(procedure)); + } + + private void execInlineChore(final InlineChore chore) { + chore.run(); + add(chore); + } + + private void execDelayedProcedure(final DelayedProcedure delayed) { + // TODO: treat this as a normal procedure, add it to the scheduler and + // let one of the workers handle it. + // Today we consider ProcedureInMemoryChore as InlineChores + final Procedure procedure = delayed.getObject(); + if (procedure instanceof ProcedureInMemoryChore) { + executeInMemoryChore((ProcedureInMemoryChore)procedure); + // if the procedure is in a waiting state again, put it back in the queue + procedure.updateTimestamp(); + if (procedure.isWaiting()) { + delayed.setTimeoutTimestamp(procedure.getTimeoutTimestamp()); + queue.add(delayed); + } + } else { + executeTimedoutProcedure(procedure); + } + } + + private void executeInMemoryChore(final ProcedureInMemoryChore chore) { + if (!chore.isWaiting()) return; + + // The ProcedureInMemoryChore is a special case, and it acts as a chore. + // instead of bringing the Chore class in, we reuse this timeout thread for + // this special case. + try { + chore.periodicExecute(getEnvironment()); + } catch (Throwable e) { + LOG.error("Ignoring " + chore + " exception: " + e.getMessage(), e); + } + } + + private void executeTimedoutProcedure(final Procedure proc) { + // The procedure received a timeout. if the procedure itself does not handle it, + // call abort() and add the procedure back in the queue for rollback. + if (proc.setTimeoutFailure(getEnvironment())) { + long rootProcId = Procedure.getRootProcedureId(procedures, proc); + RootProcedureState procStack = rollbackStack.get(rootProcId); + procStack.abort(); + store.update(proc); + scheduler.addFront(proc); + } + } + } + + private static final class DelayedProcedure + extends DelayedUtil.DelayedContainerWithTimestamp<Procedure> { + public DelayedProcedure(final Procedure procedure) { + super(procedure, procedure.getTimeoutTimestamp()); + } + } + + private static abstract class StoppableThread extends Thread { + public StoppableThread(final ThreadGroup group, final String name) { + super(group, name); + } + + public abstract void sendStopSignal(); + + public void awaitTermination() { + try { + final long startTime = EnvironmentEdgeManager.currentTime(); + for (int i = 0; isAlive(); ++i) { + sendStopSignal(); + join(250); + if (i > 0 && (i % 8) == 0) { + LOG.warn("waiting termination of thread " + getName() + ", " + + StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime)); + } + } + } catch (InterruptedException e) { + LOG.warn(getName() + " join wait got interrupted", e); + } + } + } + + // ========================================================================== + // Inline Chores (executors internal chores) + // ========================================================================== + private static abstract class InlineChore extends DelayedUtil.DelayedObject implements Runnable { + private long timeout; + + public abstract int getTimeoutInterval(); + + protected void refreshTimeout() { + this.timeout = EnvironmentEdgeManager.currentTime() + getTimeoutInterval(); + } + + @Override + public long getTimeoutTimestamp() { + return timeout; + } + } + + // ---------------------------------------------------------------------------- + // TODO-MAYBE: Should we provide a InlineChore to notify the store with the + // full set of procedures pending and completed to write a compacted + // version of the log (in case is a log)? + // In theory no, procedures are have a short life, so at some point the store + // will have the tracker saying everything is in the last log. + // ---------------------------------------------------------------------------- + + private final class WorkerMonitor extends InlineChore { + public static final String WORKER_MONITOR_INTERVAL_CONF_KEY = + "hbase.procedure.worker.monitor.interval.msec"; + private static final int DEFAULT_WORKER_MONITOR_INTERVAL = 5000; // 5sec + + public static final String WORKER_STUCK_THRESHOLD_CONF_KEY = + "hbase.procedure.worker.stuck.threshold.msec"; + private static final int DEFAULT_WORKER_STUCK_THRESHOLD = 10000; // 10sec + + public static final String WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY = + "hbase.procedure.worker.add.stuck.percentage"; + private static final float DEFAULT_WORKER_ADD_STUCK_PERCENTAGE = 0.5f; // 50% stuck + + private float addWorkerStuckPercentage = DEFAULT_WORKER_ADD_STUCK_PERCENTAGE; + private int timeoutInterval = DEFAULT_WORKER_MONITOR_INTERVAL; + private int stuckThreshold = DEFAULT_WORKER_STUCK_THRESHOLD; + + public WorkerMonitor() { + refreshConfig(); + } + + @Override + public void run() { + final int stuckCount = checkForStuckWorkers(); + checkThreadCount(stuckCount); + + // refresh interval (poor man dynamic conf update) + refreshConfig(); + } + + private int checkForStuckWorkers() { + // check if any of the worker is stuck + int stuckCount = 0; + for (WorkerThread worker: workerThreads) { + if (worker.getCurrentRunTime() < stuckThreshold) { + continue; + } + + // WARN the worker is stuck + stuckCount++; + LOG.warn("found worker stuck " + worker + + " run time " + StringUtils.humanTimeDiff(worker.getCurrentRunTime())); + } + return stuckCount; + } + + private void checkThreadCount(final int stuckCount) { + // nothing to do if there are no runnable tasks + if (stuckCount < 1 || !scheduler.hasRunnables()) return; + + // add a new thread if the worker stuck percentage exceed the threshold limit + // and every handler is active. + final float stuckPerc = ((float)stuckCount) / workerThreads.size(); + if (stuckPerc >= addWorkerStuckPercentage && + activeExecutorCount.get() == workerThreads.size()) { + final WorkerThread worker = new WorkerThread(threadGroup); + workerThreads.add(worker); + worker.start(); + LOG.debug("added a new worker thread " + worker); + } + } + + private void refreshConfig() { + addWorkerStuckPercentage = conf.getFloat(WORKER_ADD_STUCK_PERCENTAGE_CONF_KEY, + DEFAULT_WORKER_ADD_STUCK_PERCENTAGE); + timeoutInterval = conf.getInt(WORKER_MONITOR_INTERVAL_CONF_KEY, + DEFAULT_WORKER_MONITOR_INTERVAL); + stuckThreshold = conf.getInt(WORKER_STUCK_THRESHOLD_CONF_KEY, + DEFAULT_WORKER_STUCK_THRESHOLD); + } + + @Override + public int getTimeoutInterval() { + return timeoutInterval; } - return new Pair(result, proc); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e9dabe/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java new file mode 100644 index 0000000..ea34c49 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2.util; + +import java.util.Objects; +import java.util.concurrent.DelayQueue; +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class DelayedUtil { + private DelayedUtil() { } + + public interface DelayedWithTimeout extends Delayed { + long getTimeoutTimestamp(); + } + + public static final DelayedWithTimeout DELAYED_POISON = new DelayedWithTimeout() { + @Override + public long getTimeoutTimestamp() { + return 0; + } + + @Override + public long getDelay(final TimeUnit unit) { + return 0; + } + + @Override + public int compareTo(final Delayed o) { + return Long.compare(0, DelayedUtil.getTimeoutTimestamp(o)); + } + + @Override + public boolean equals(final Object other) { + return this == other; + } + + @Override + public String toString() { + return getClass().getSimpleName() + "(POISON)"; + } + }; + + public static <E extends Delayed> E takeWithoutInterrupt(final DelayQueue<E> queue) { + try { + return queue.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return null; + } + } + + public static long getRemainingTime(final TimeUnit resultUnit, final long timeoutTime) { + final long currentTime = EnvironmentEdgeManager.currentTime(); + if (currentTime >= timeoutTime) { + return 0; + } + return resultUnit.convert(timeoutTime - currentTime, TimeUnit.MILLISECONDS); + } + + public static int compareDelayed(final Delayed o1, final Delayed o2) { + return Long.compare(getTimeoutTimestamp(o1), getTimeoutTimestamp(o2)); + } + + private static long getTimeoutTimestamp(final Delayed o) { + assert o instanceof DelayedWithTimeout : "expected DelayedWithTimeout instance, got " + o; + return ((DelayedWithTimeout)o).getTimeoutTimestamp(); + } + + public static abstract class DelayedObject implements DelayedWithTimeout { + @Override + public long getDelay(final TimeUnit unit) { + return DelayedUtil.getRemainingTime(unit, getTimeoutTimestamp()); + } + + @Override + public int compareTo(final Delayed other) { + return DelayedUtil.compareDelayed(this, other); + } + } + + public static abstract class DelayedContainer<T> extends DelayedObject { + private final T object; + + public DelayedContainer(final T object) { + this.object = object; + } + + public T getObject() { + return this.object; + } + + @Override + public boolean equals(final Object other) { + if (other == this) return true; + if (!(other instanceof DelayedContainer)) return false; + return Objects.equals(getObject(), ((DelayedContainer)other).getObject()); + } + + @Override + public int hashCode() { + return object != null ? object.hashCode() : 0; + } + + @Override + public String toString() { + return getClass().getSimpleName() + "(" + getObject() + ")"; + } + } + + public static class DelayedContainerWithTimestamp<T> extends DelayedContainer<T> { + private long timeoutTimestamp; + + public DelayedContainerWithTimestamp(final T object, final long timeoutTimestamp) { + super(object); + setTimeoutTimestamp(timeoutTimestamp); + } + + @Override + public long getTimeoutTimestamp() { + return timeoutTimestamp; + } + + public void setTimeoutTimestamp(final long timeoutTimestamp) { + this.timeoutTimestamp = timeoutTimestamp; + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e9dabe/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index f2c7e6b..0b4e4ed 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -74,8 +74,8 @@ public class ProcedureTestingUtility { public static <TEnv> void restart(ProcedureExecutor<TEnv> procExecutor, Runnable beforeStartAction, boolean failOnCorrupted) throws Exception { ProcedureStore procStore = procExecutor.getStore(); - int storeThreads = procExecutor.getNumThreads(); - int execThreads = procExecutor.getNumThreads(); + int storeThreads = procExecutor.getCorePoolSize(); + int execThreads = procExecutor.getCorePoolSize(); // stop procExecutor.stop(); procExecutor.join(); http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e9dabe/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java new file mode 100644 index 0000000..851dc3e --- /dev/null +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureExecutor.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseCommonTestingUtility; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility; +import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility.NoopProcedure; +import org.apache.hadoop.hbase.procedure2.store.NoopProcedureStore; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.testclassification.MasterTests; +import org.apache.hadoop.hbase.util.Threads; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; + +@Category({MasterTests.class, SmallTests.class}) +public class TestProcedureExecutor { + private static final Log LOG = LogFactory.getLog(TestProcedureExecutor.class); + + private TestProcEnv procEnv; + private NoopProcedureStore procStore; + private ProcedureExecutor<TestProcEnv> procExecutor; + + private HBaseCommonTestingUtility htu; + + @Before + public void setUp() throws Exception { + htu = new HBaseCommonTestingUtility(); + + // NOTE: The executor will be created by each test + procEnv = new TestProcEnv(); + procStore = new NoopProcedureStore(); + procStore.start(1); + } + + @After + public void tearDown() throws Exception { + procExecutor.stop(); + procStore.stop(false); + procExecutor.join(); + } + + private void createNewExecutor(final Configuration conf, final int numThreads) throws Exception { + procExecutor = new ProcedureExecutor(conf, procEnv, procStore); + procExecutor.start(numThreads, true); + } + + @Test(timeout=60000) + public void testWorkerStuck() throws Exception { + // replace the executor + final Configuration conf = new Configuration(htu.getConfiguration()); + conf.setFloat("hbase.procedure.worker.add.stuck.percentage", 0.5f); + conf.setInt("hbase.procedure.worker.monitor.interval.msec", 500); + conf.setInt("hbase.procedure.worker.stuck.threshold.msec", 750); + + final int NUM_THREADS = 2; + createNewExecutor(conf, NUM_THREADS); + + Semaphore latch1 = new Semaphore(2); + latch1.acquire(2); + BusyWaitProcedure busyProc1 = new BusyWaitProcedure(latch1); + + Semaphore latch2 = new Semaphore(2); + latch2.acquire(2); + BusyWaitProcedure busyProc2 = new BusyWaitProcedure(latch2); + + long busyProcId1 = procExecutor.submitProcedure(busyProc1); + long busyProcId2 = procExecutor.submitProcedure(busyProc2); + long otherProcId = procExecutor.submitProcedure(new NoopProcedure()); + + // wait until a new worker is being created + int threads1 = waitThreadCount(NUM_THREADS + 1); + LOG.info("new threads got created: " + (threads1 - NUM_THREADS)); + assertEquals(NUM_THREADS + 1, threads1); + + ProcedureTestingUtility.waitProcedure(procExecutor, otherProcId); + assertEquals(true, procExecutor.isFinished(otherProcId)); + ProcedureTestingUtility.assertProcNotFailed(procExecutor, otherProcId); + + assertEquals(true, procExecutor.isRunning()); + assertEquals(false, procExecutor.isFinished(busyProcId1)); + assertEquals(false, procExecutor.isFinished(busyProcId2)); + + // terminate the busy procedures + latch1.release(); + latch2.release(); + + LOG.info("set keep alive and wait threads being removed"); + procExecutor.setKeepAliveTime(500L, TimeUnit.MILLISECONDS); + int threads2 = waitThreadCount(NUM_THREADS); + LOG.info("threads got removed: " + (threads1 - threads2)); + assertEquals(NUM_THREADS, threads2); + + // terminate the busy procedures + latch1.release(); + latch2.release(); + + // wait for all procs to complete + ProcedureTestingUtility.waitProcedure(procExecutor, busyProcId1); + ProcedureTestingUtility.waitProcedure(procExecutor, busyProcId2); + ProcedureTestingUtility.assertProcNotFailed(procExecutor, busyProcId1); + ProcedureTestingUtility.assertProcNotFailed(procExecutor, busyProcId2); + } + + private int waitThreadCount(final int expectedThreads) { + while (procExecutor.isRunning()) { + if (procExecutor.getWorkerThreadCount() == expectedThreads) { + break; + } + LOG.debug("waiting for thread count=" + expectedThreads + + " current=" + procExecutor.getWorkerThreadCount()); + Threads.sleepWithoutInterrupt(250); + } + return procExecutor.getWorkerThreadCount(); + } + + public static class BusyWaitProcedure extends NoopProcedure<TestProcEnv> { + private final Semaphore latch; + + public BusyWaitProcedure(final Semaphore latch) { + this.latch = latch; + } + + @Override + protected Procedure[] execute(final TestProcEnv env) { + try { + LOG.info("worker started " + this); + if (!latch.tryAcquire(1, 30, TimeUnit.SECONDS)) { + throw new Exception("waited too long"); + } + + LOG.info("worker step 2 " + this); + if (!latch.tryAcquire(1, 30, TimeUnit.SECONDS)) { + throw new Exception("waited too long"); + } + } catch (Exception e) { + LOG.error("got unexpected exception", e); + setFailure("BusyWaitProcedure", e); + } + return null; + } + } + + private class TestProcEnv { } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c6e9dabe/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java index 8bc8fa8..50ccfa6 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureInMemoryChore.java @@ -76,17 +76,17 @@ public class TestProcedureInMemoryChore { CountDownLatch latch = new CountDownLatch(nCountDown); TestLatchChore chore = new TestLatchChore(timeoutMSec, latch); procExecutor.addChore(chore); - assertTrue(chore.isRunnable()); + assertTrue(chore.isWaiting()); latch.await(); // remove the chore and verify it is no longer executed - assertTrue(chore.isRunnable()); + assertTrue(chore.isWaiting()); procExecutor.removeChore(chore); latch = new CountDownLatch(nCountDown); chore.setLatch(latch); latch.await(timeoutMSec * nCountDown, TimeUnit.MILLISECONDS); LOG.info("chore latch count=" + latch.getCount()); - assertFalse(chore.isRunnable()); + assertFalse(chore.isWaiting()); assertTrue("latchCount=" + latch.getCount(), latch.getCount() > 0); } @@ -104,6 +104,7 @@ public class TestProcedureInMemoryChore { @Override protected void periodicExecute(final TestProcEnv env) { + LOG.info("periodic execute " + this); latch.countDown(); } }