http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index b648cf2..1bb6118 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -32,8 +32,6 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; -import java.util.stream.Stream; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.DelayQueue; @@ -115,11 +113,9 @@ public class ProcedureExecutor<TEnvironment> { * Internal cleaner that removes the completed procedure results after a TTL. * NOTE: This is a special case handled in timeoutLoop(). * - * <p>Since the client code looks more or less like: - * <pre> + * Since the client code looks more or less like: * procId = master.doOperation() * while (master.getProcResult(procId) == ProcInProgress); - * </pre> * The master should not throw away the proc result as soon as the procedure is done * but should wait a result request from the client (see executor.removeResult(procId)) * The client will call something like master.isProcDone() or master.getProcResult() @@ -484,10 +480,10 @@ public class ProcedureExecutor<TEnvironment> { // We have numThreads executor + one timer thread used for timing out // procedures and triggering periodic procedures. this.corePoolSize = numThreads; - LOG.info("Starting ProcedureExecutor Worker threads (ProcExecWrkr)=" + corePoolSize); + LOG.info("Starting executor worker threads=" + corePoolSize); // Create the Thread Group for the executors - threadGroup = new ThreadGroup("ProcExecThrdGrp"); + threadGroup = new ThreadGroup("ProcedureExecutor"); // Create the timeout executor timeoutExecutor = new TimeoutExecutorThread(threadGroup); @@ -1081,16 +1077,13 @@ public class ProcedureExecutor<TEnvironment> { final Long rootProcId = getRootProcedureId(proc); if (rootProcId == null) { // The 'proc' was ready to run but the root procedure was rolledback - LOG.warn("Rollback because parent is done/rolledback proc=" + proc); executeRollback(proc); return; } final RootProcedureState procStack = rollbackStack.get(rootProcId); - if (procStack == null) { - LOG.warn("RootProcedureState is null for " + proc.getProcId()); - return; - } + if (procStack == null) return; + do { // Try to acquire the execution if (!procStack.acquire(proc)) { @@ -1104,7 +1097,6 @@ public class ProcedureExecutor<TEnvironment> { scheduler.yield(proc); break; case LOCK_EVENT_WAIT: - LOG.info("LOCK_EVENT_WAIT rollback..." + proc); procStack.unsetRollback(); break; default: @@ -1122,7 +1114,6 @@ public class ProcedureExecutor<TEnvironment> { scheduler.yield(proc); break; case LOCK_EVENT_WAIT: - LOG.info("LOCK_EVENT_WAIT can't rollback child running?..." + proc); break; default: throw new UnsupportedOperationException(); @@ -1134,21 +1125,16 @@ public class ProcedureExecutor<TEnvironment> { // Execute the procedure assert proc.getState() == ProcedureState.RUNNABLE : proc; - // Note that lock is NOT about concurrency but rather about ensuring - // ownership of a procedure of an entity such as a region or table - LockState lockState = acquireLock(proc); - switch (lockState) { + switch (acquireLock(proc)) { case LOCK_ACQUIRED: execProcedure(procStack, proc); releaseLock(proc, false); break; case LOCK_YIELD_WAIT: - LOG.info(lockState + " " + proc); scheduler.yield(proc); break; case LOCK_EVENT_WAIT: - // Someone will wake us up when the lock is available - LOG.debug(lockState + " " + proc); + // someone will wake us up when the lock is available break; default: throw new UnsupportedOperationException(); @@ -1164,7 +1150,10 @@ public class ProcedureExecutor<TEnvironment> { if (proc.isSuccess()) { // update metrics on finishing the procedure proc.updateMetricsOnFinish(getEnvironment(), proc.elapsedTime(), true); - LOG.info("Finish " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime())); + + if (LOG.isDebugEnabled()) { + LOG.debug("Finished " + proc + " in " + StringUtils.humanTimeDiff(proc.elapsedTime())); + } // Finalize the procedure state if (proc.getProcId() == rootProcId) { procedureFinished(proc); @@ -1189,7 +1178,7 @@ public class ProcedureExecutor<TEnvironment> { private void releaseLock(final Procedure proc, final boolean force) { final TEnvironment env = getEnvironment(); - // For how the framework works, we know that we will always have the lock + // for how the framework works, we know that we will always have the lock // when we call releaseLock(), so we can avoid calling proc.hasLock() if (force || !proc.holdLock(env)) { proc.doReleaseLock(env); @@ -1204,8 +1193,6 @@ public class ProcedureExecutor<TEnvironment> { private LockState executeRollback(final long rootProcId, final RootProcedureState procStack) { final Procedure rootProc = procedures.get(rootProcId); RemoteProcedureException exception = rootProc.getException(); - // TODO: This needs doc. The root proc doesn't have an exception. Maybe we are - // rolling back because the subprocedure does. Clarify. if (exception == null) { exception = procStack.getException(); rootProc.setFailure(exception); @@ -1282,7 +1269,7 @@ public class ProcedureExecutor<TEnvironment> { return LockState.LOCK_YIELD_WAIT; } catch (Throwable e) { // Catch NullPointerExceptions or similar errors... - LOG.fatal("CODE-BUG: Uncaught runtime exception fo " + proc, e); + LOG.fatal("CODE-BUG: Uncatched runtime exception for procedure: " + proc, e); } // allows to kill the executor before something is stored to the wal. @@ -1318,55 +1305,29 @@ public class ProcedureExecutor<TEnvironment> { } /** - * Executes <code>procedure</code> - * <ul> - * <li>Calls the doExecute() of the procedure - * <li>If the procedure execution didn't fail (i.e. valid user input) - * <ul> - * <li>...and returned subprocedures - * <ul><li>The subprocedures are initialized. - * <li>The subprocedures are added to the store - * <li>The subprocedures are added to the runnable queue - * <li>The procedure is now in a WAITING state, waiting for the subprocedures to complete - * </ul> - * </li> - * <li>...if there are no subprocedure - * <ul><li>the procedure completed successfully - * <li>if there is a parent (WAITING) - * <li>the parent state will be set to RUNNABLE - * </ul> - * </li> - * </ul> - * </li> - * <li>In case of failure - * <ul> - * <li>The store is updated with the new state</li> - * <li>The executor (caller of this method) will start the rollback of the procedure</li> - * </ul> - * </li> - * </ul> + * Executes the specified procedure + * - calls the doExecute() of the procedure + * - if the procedure execution didn't fail (e.g. invalid user input) + * - ...and returned subprocedures + * - the subprocedures are initialized. + * - the subprocedures are added to the store + * - the subprocedures are added to the runnable queue + * - the procedure is now in a WAITING state, waiting for the subprocedures to complete + * - ...if there are no subprocedure + * - the procedure completed successfully + * - if there is a parent (WAITING) + * - the parent state will be set to RUNNABLE + * - in case of failure + * - the store is updated with the new state + * - the executor (caller of this method) will start the rollback of the procedure */ - private void execProcedure(final RootProcedureState procStack, - final Procedure<TEnvironment> procedure) { + private void execProcedure(final RootProcedureState procStack, final Procedure procedure) { Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE); - // Procedures can suspend themselves. They skip out by throwing a ProcedureSuspendedException. - // The exception is caught below and then we hurry to the exit without disturbing state. The - // idea is that the processing of this procedure will be unsuspended later by an external event - // such the report of a region open. TODO: Currently, its possible for two worker threads - // to be working on the same procedure concurrently (locking in procedures is NOT about - // concurrency but about tying an entity to a procedure; i.e. a region to a particular - // procedure instance). This can make for issues if both threads are changing state. - // See env.getProcedureScheduler().wakeEvent(regionNode.getProcedureEvent()); - // in RegionTransitionProcedure#reportTransition for example of Procedure putting - // itself back on the scheduler making it possible for two threads running against - // the one Procedure. Might be ok if they are both doing different, idempotent sections. + // Execute the procedure boolean suspended = false; - - // Whether to 're-' -execute; run through the loop again. boolean reExecute = false; - - Procedure<TEnvironment>[] subprocs = null; + Procedure[] subprocs = null; do { reExecute = false; try { @@ -1375,20 +1336,14 @@ public class ProcedureExecutor<TEnvironment> { subprocs = null; } } catch (ProcedureSuspendedException e) { - if (LOG.isTraceEnabled()) { - LOG.trace("Suspend " + procedure); - } suspended = true; } catch (ProcedureYieldException e) { if (LOG.isTraceEnabled()) { - LOG.trace("Yield " + procedure + ": " + e.getMessage(), e); + LOG.trace("Yield " + procedure + ": " + e.getMessage()); } scheduler.yield(procedure); return; } catch (InterruptedException e) { - if (LOG.isTraceEnabled()) { - LOG.trace("Yield interrupt " + procedure + ": " + e.getMessage(), e); - } handleInterruptedException(procedure, e); scheduler.yield(procedure); return; @@ -1402,26 +1357,14 @@ public class ProcedureExecutor<TEnvironment> { if (!procedure.isFailed()) { if (subprocs != null) { if (subprocs.length == 1 && subprocs[0] == procedure) { - // Procedure returned itself. Quick-shortcut for a state machine-like procedure; - // i.e. we go around this loop again rather than go back out on the scheduler queue. + // quick-shortcut for a state machine like procedure subprocs = null; reExecute = true; - if (LOG.isTraceEnabled()) { - LOG.trace("Short-circuit to next step on pid=" + procedure.getProcId()); - } } else { - // Yield the current procedure, and make the subprocedure runnable - // subprocs may come back 'null'. + // yield the current procedure, and make the subprocedure runnable subprocs = initializeChildren(procStack, procedure, subprocs); - LOG.info("Initialized subprocedures=" + - (subprocs == null? null: - Stream.of(subprocs).map(e -> "{" + e.toString() + "}"). - collect(Collectors.toList()).toString())); } } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { - if (LOG.isTraceEnabled()) { - LOG.trace("Added to timeoutExecutor " + procedure); - } timeoutExecutor.add(procedure); } else if (!suspended) { // No subtask, so we are done @@ -1445,13 +1388,12 @@ public class ProcedureExecutor<TEnvironment> { // executor thread to stop. The statement following the method call below seems to check if // store is not running, to prevent scheduling children procedures, re-execution or yield // of this procedure. This may need more scrutiny and subsequent cleanup in future - // - // Commit the transaction even if a suspend (state may have changed). Note this append - // can take a bunch of time to complete. + // Commit the transaction updateStoreOnExec(procStack, procedure, subprocs); // if the store is not running we are aborting if (!store.isRunning()) return; + // if the procedure is kind enough to pass the slot to someone else, yield if (procedure.isRunnable() && !suspended && procedure.isYieldAfterExecutionStep(getEnvironment())) { @@ -1461,14 +1403,14 @@ public class ProcedureExecutor<TEnvironment> { assert (reExecute && subprocs == null) || !reExecute; } while (reExecute); + // Submit the new subprocedures if (subprocs != null && !procedure.isFailed()) { submitChildrenProcedures(subprocs); } - // if the procedure is complete and has a parent, count down the children latch. - // If 'suspended', do nothing to change state -- let other threads handle unsuspend event. - if (!suspended && procedure.isFinished() && procedure.hasParent()) { + // if the procedure is complete and has a parent, count down the children latch + if (procedure.isFinished() && procedure.hasParent()) { countDownChildren(procStack, procedure); } } @@ -1527,13 +1469,18 @@ public class ProcedureExecutor<TEnvironment> { } // If this procedure is the last child awake the parent procedure - LOG.info("Finish suprocedure " + procedure); - if (parent.tryRunnable()) { - // If we succeeded in making the parent runnable -- i.e. all of its - // children have completed, move parent to front of the queue. + final boolean traceEnabled = LOG.isTraceEnabled(); + if (traceEnabled) { + LOG.trace(parent + " child is done: " + procedure); + } + + if (parent.childrenCountDown() && parent.getState() == ProcedureState.WAITING) { + parent.setState(ProcedureState.RUNNABLE); store.update(parent); scheduler.addFront(parent); - LOG.info("Finished subprocedure(s) of " + parent + "; resume parent processing."); + if (traceEnabled) { + LOG.trace(parent + " all the children finished their work, resume."); + } return; } } @@ -1622,10 +1569,9 @@ public class ProcedureExecutor<TEnvironment> { // ========================================================================== private final class WorkerThread extends StoppableThread { private final AtomicLong executionStartTime = new AtomicLong(Long.MAX_VALUE); - private Procedure activeProcedure; public WorkerThread(final ThreadGroup group) { - super(group, "ProcExecWrkr-" + workerId.incrementAndGet()); + super(group, "ProcExecWorker-" + workerId.incrementAndGet()); } @Override @@ -1635,49 +1581,29 @@ public class ProcedureExecutor<TEnvironment> { @Override public void run() { + final boolean traceEnabled = LOG.isTraceEnabled(); long lastUpdate = EnvironmentEdgeManager.currentTime(); - try { - while (isRunning() && keepAlive(lastUpdate)) { - this.activeProcedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); - if (this.activeProcedure == null) continue; - int activeCount = activeExecutorCount.incrementAndGet(); - int runningCount = store.setRunningProcedureCount(activeCount); - if (LOG.isTraceEnabled()) { - LOG.trace("Execute pid=" + this.activeProcedure.getProcId() + - " runningCount=" + runningCount + ", activeCount=" + activeCount); - } - executionStartTime.set(EnvironmentEdgeManager.currentTime()); - try { - executeProcedure(this.activeProcedure); - } catch (AssertionError e) { - LOG.info("ASSERT pid=" + this.activeProcedure.getProcId(), e); - throw e; - } finally { - activeCount = activeExecutorCount.decrementAndGet(); - runningCount = store.setRunningProcedureCount(activeCount); - if (LOG.isTraceEnabled()) { - LOG.trace("Halt pid=" + this.activeProcedure.getProcId() + - " runningCount=" + runningCount + ", activeCount=" + activeCount); - } - this.activeProcedure = null; - lastUpdate = EnvironmentEdgeManager.currentTime(); - executionStartTime.set(Long.MAX_VALUE); + while (isRunning() && keepAlive(lastUpdate)) { + final Procedure procedure = scheduler.poll(keepAliveTime, TimeUnit.MILLISECONDS); + if (procedure == null) continue; + + store.setRunningProcedureCount(activeExecutorCount.incrementAndGet()); + executionStartTime.set(EnvironmentEdgeManager.currentTime()); + try { + if (traceEnabled) { + LOG.trace("Trying to start the execution of " + procedure); } + executeProcedure(procedure); + } finally { + store.setRunningProcedureCount(activeExecutorCount.decrementAndGet()); + lastUpdate = EnvironmentEdgeManager.currentTime(); + executionStartTime.set(Long.MAX_VALUE); } - } catch (Throwable t) { - LOG.warn("Worker terminating UNNATURALLY " + this.activeProcedure, t); - } finally { - LOG.debug("Worker terminated."); } + LOG.debug("Worker thread terminated " + this); workerThreads.remove(this); } - @Override - public String toString() { - Procedure<?> p = this.activeProcedure; - return getName() + "(pid=" + (p == null? Procedure.NO_PROC_ID: p.getProcId() + ")"); - } - /** * @return the time since the current procedure is running */ @@ -1691,15 +1617,14 @@ public class ProcedureExecutor<TEnvironment> { } } - /** - * Runs task on a period such as check for stuck workers. - * @see InlineChore - */ + // ========================================================================== + // Timeout Thread + // ========================================================================== private final class TimeoutExecutorThread extends StoppableThread { private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<>(); public TimeoutExecutorThread(final ThreadGroup group) { - super(group, "ProcExecTimeout"); + super(group, "ProcedureTimeoutExecutor"); } @Override @@ -1709,7 +1634,7 @@ public class ProcedureExecutor<TEnvironment> { @Override public void run() { - final boolean traceEnabled = LOG.isTraceEnabled(); + final boolean isTraceEnabled = LOG.isTraceEnabled(); while (isRunning()) { final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue); if (task == null || task == DelayedUtil.DELAYED_POISON) { @@ -1718,8 +1643,8 @@ public class ProcedureExecutor<TEnvironment> { continue; } - if (traceEnabled) { - LOG.trace("Executing " + task); + if (isTraceEnabled) { + LOG.trace("Trying to start the execution of " + task); } // execute the task @@ -1740,8 +1665,6 @@ public class ProcedureExecutor<TEnvironment> { public void add(final Procedure procedure) { assert procedure.getState() == ProcedureState.WAITING_TIMEOUT; - LOG.info("ADDED " + procedure + "; timeout=" + procedure.getTimeout() + - ", timestamp=" + procedure.getTimeoutTimestamp()); queue.add(new DelayedProcedure(procedure)); }
http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java index b148dae..bdced10 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureInMemoryChore.java @@ -26,7 +26,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; /** * Special procedure used as a chore. - * Instead of bringing the Chore class in (dependencies reason), + * instead of bringing the Chore class in (dependencies reason), * we reuse the executor timeout thread for this special case. * * The assumption is that procedure is used as hook to dispatch other procedures @@ -43,7 +43,7 @@ public abstract class ProcedureInMemoryChore<TEnvironment> extends Procedure<TEn protected abstract void periodicExecute(final TEnvironment env); @Override - protected Procedure<TEnvironment>[] execute(final TEnvironment env) { + protected Procedure[] execute(final TEnvironment env) { throw new UnsupportedOperationException(); } @@ -66,4 +66,4 @@ public abstract class ProcedureInMemoryChore<TEnvironment> extends Procedure<TEn public void deserializeStateData(final InputStream stream) { throw new UnsupportedOperationException(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java index 233ef57..93d0d5d 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureScheduler.java @@ -93,7 +93,7 @@ public interface ProcedureScheduler { /** * Mark the event as not ready. - * Procedures calling waitEvent() will be suspended. + * procedures calling waitEvent() will be suspended. * @param event the event to mark as suspended/not ready */ void suspendEvent(ProcedureEvent event); @@ -125,7 +125,6 @@ public interface ProcedureScheduler { * List lock queues. * @return the locks */ - // TODO: This seems to be the wrong place to hang this method. List<LockInfo> listLocks(); /** http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java deleted file mode 100644 index 8d5ff3c..0000000 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java +++ /dev/null @@ -1,375 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.procedure2; - -import java.io.IOException; -import java.lang.Thread.UncaughtExceptionHandler; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.DelayQueue; -import java.util.concurrent.Future; -import java.util.concurrent.FutureTask; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.procedure2.util.DelayedUtil; -import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedContainerWithTimestamp; -import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout; -import org.apache.hadoop.hbase.procedure2.util.StringUtils; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Threads; - -import com.google.common.collect.ArrayListMultimap; - -/** - * A procedure dispatcher that aggregates and sends after elapsed time or after we hit - * count threshold. Creates its own threadpool to run RPCs with timeout. - * <ul> - * <li>Each server queue has a dispatch buffer</li> - * <li>Once the dispatch buffer reaches a threshold-size/time we send<li> - * </ul> - * <p>Call {@link #start()} and then {@link #submitTask(Callable)}. When done, - * call {@link #stop()}. - */ -@InterfaceAudience.Private -public abstract class RemoteProcedureDispatcher<TEnv, TRemote extends Comparable<TRemote>> { - private static final Log LOG = LogFactory.getLog(RemoteProcedureDispatcher.class); - - public static final String THREAD_POOL_SIZE_CONF_KEY = - "hbase.procedure.remote.dispatcher.threadpool.size"; - private static final int DEFAULT_THREAD_POOL_SIZE = 128; - - public static final String DISPATCH_DELAY_CONF_KEY = - "hbase.procedure.remote.dispatcher.delay.msec"; - private static final int DEFAULT_DISPATCH_DELAY = 150; - - public static final String DISPATCH_MAX_QUEUE_SIZE_CONF_KEY = - "hbase.procedure.remote.dispatcher.max.queue.size"; - private static final int DEFAULT_MAX_QUEUE_SIZE = 32; - - private final AtomicBoolean running = new AtomicBoolean(false); - private final ConcurrentHashMap<TRemote, BufferNode> nodeMap = - new ConcurrentHashMap<TRemote, BufferNode>(); - - private final int operationDelay; - private final int queueMaxSize; - private final int corePoolSize; - - private TimeoutExecutorThread timeoutExecutor; - private ThreadPoolExecutor threadPool; - - protected RemoteProcedureDispatcher(Configuration conf) { - this.corePoolSize = conf.getInt(THREAD_POOL_SIZE_CONF_KEY, DEFAULT_THREAD_POOL_SIZE); - this.operationDelay = conf.getInt(DISPATCH_DELAY_CONF_KEY, DEFAULT_DISPATCH_DELAY); - this.queueMaxSize = conf.getInt(DISPATCH_MAX_QUEUE_SIZE_CONF_KEY, DEFAULT_MAX_QUEUE_SIZE); - } - - public boolean start() { - if (running.getAndSet(true)) { - LOG.warn("Already running"); - return false; - } - - LOG.info("Starting procedure remote dispatcher; threads=" + this.corePoolSize + - ", queueMaxSize=" + this.queueMaxSize + ", operationDelay=" + this.operationDelay); - - // Create the timeout executor - timeoutExecutor = new TimeoutExecutorThread(); - timeoutExecutor.start(); - - // Create the thread pool that will execute RPCs - threadPool = Threads.getBoundedCachedThreadPool(corePoolSize, 60L, TimeUnit.SECONDS, - Threads.newDaemonThreadFactory(this.getClass().getSimpleName(), - getUncaughtExceptionHandler())); - return true; - } - - public boolean stop() { - if (!running.getAndSet(false)) { - return false; - } - - LOG.info("Stopping procedure remote dispatcher"); - - // send stop signals - timeoutExecutor.sendStopSignal(); - threadPool.shutdownNow(); - return true; - } - - public void join() { - assert !running.get() : "expected not running"; - - // wait the timeout executor - timeoutExecutor.awaitTermination(); - timeoutExecutor = null; - - // wait for the thread pool to terminate - threadPool.shutdownNow(); - try { - while (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) { - LOG.warn("Waiting for thread-pool to terminate"); - } - } catch (InterruptedException e) { - LOG.warn("Interrupted while waiting for thread-pool termination", e); - } - } - - protected UncaughtExceptionHandler getUncaughtExceptionHandler() { - return new UncaughtExceptionHandler() { - @Override - public void uncaughtException(Thread t, Throwable e) { - LOG.warn("Failed to execute remote procedures " + t.getName(), e); - } - }; - } - - // ============================================================================================ - // Node Helpers - // ============================================================================================ - /** - * Add a node that will be able to execute remote procedures - * @param key the node identifier - */ - public void addNode(final TRemote key) { - assert key != null: "Tried to add a node with a null key"; - final BufferNode newNode = new BufferNode(key); - nodeMap.putIfAbsent(key, newNode); - } - - /** - * Add a remote rpc. Be sure to check result for successful add. - * @param key the node identifier - * @return True if we successfully added the operation. - */ - public boolean addOperationToNode(final TRemote key, RemoteProcedure rp) { - assert key != null : "found null key for node"; - BufferNode node = nodeMap.get(key); - if (node == null) { - return false; - } - node.add(rp); - // Check our node still in the map; could have been removed by #removeNode. - return nodeMap.contains(node); - } - - /** - * Remove a remote node - * @param key the node identifier - */ - public boolean removeNode(final TRemote key) { - final BufferNode node = nodeMap.remove(key); - if (node == null) return false; - node.abortOperationsInQueue(); - return true; - } - - // ============================================================================================ - // Task Helpers - // ============================================================================================ - protected Future<Void> submitTask(Callable<Void> task) { - return threadPool.submit(task); - } - - protected Future<Void> submitTask(Callable<Void> task, long delay, TimeUnit unit) { - final FutureTask<Void> futureTask = new FutureTask(task); - timeoutExecutor.add(new DelayedTask(futureTask, delay, unit)); - return futureTask; - } - - protected abstract void remoteDispatch(TRemote key, Set<RemoteProcedure> operations); - protected abstract void abortPendingOperations(TRemote key, Set<RemoteProcedure> operations); - - /** - * Data structure with reference to remote operation. - */ - public static abstract class RemoteOperation { - private final RemoteProcedure remoteProcedure; - - protected RemoteOperation(final RemoteProcedure remoteProcedure) { - this.remoteProcedure = remoteProcedure; - } - - public RemoteProcedure getRemoteProcedure() { - return remoteProcedure; - } - } - - /** - * Remote procedure reference. - * @param <TEnv> - * @param <TRemote> - */ - public interface RemoteProcedure<TEnv, TRemote> { - RemoteOperation remoteCallBuild(TEnv env, TRemote remote); - void remoteCallCompleted(TEnv env, TRemote remote, RemoteOperation response); - void remoteCallFailed(TEnv env, TRemote remote, IOException exception); - } - - /** - * Account of what procedures are running on remote node. - * @param <TEnv> - * @param <TRemote> - */ - public interface RemoteNode<TEnv, TRemote> { - TRemote getKey(); - void add(RemoteProcedure<TEnv, TRemote> operation); - void dispatch(); - } - - protected ArrayListMultimap<Class<?>, RemoteOperation> buildAndGroupRequestByType(final TEnv env, - final TRemote remote, final Set<RemoteProcedure> operations) { - final ArrayListMultimap<Class<?>, RemoteOperation> requestByType = ArrayListMultimap.create(); - for (RemoteProcedure proc: operations) { - RemoteOperation operation = proc.remoteCallBuild(env, remote); - requestByType.put(operation.getClass(), operation); - } - return requestByType; - } - - protected <T extends RemoteOperation> List<T> fetchType( - final ArrayListMultimap<Class<?>, RemoteOperation> requestByType, final Class<T> type) { - return (List<T>)requestByType.removeAll(type); - } - - // ============================================================================================ - // Timeout Helpers - // ============================================================================================ - private final class TimeoutExecutorThread extends Thread { - private final DelayQueue<DelayedWithTimeout> queue = new DelayQueue<DelayedWithTimeout>(); - - public TimeoutExecutorThread() { - super("ProcedureDispatcherTimeoutThread"); - } - - @Override - public void run() { - while (running.get()) { - final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue); - if (task == null || task == DelayedUtil.DELAYED_POISON) { - // the executor may be shutting down, and the task is just the shutdown request - continue; - } - if (task instanceof DelayedTask) { - threadPool.execute(((DelayedTask)task).getObject()); - } else { - ((BufferNode)task).dispatch(); - } - } - } - - public void add(final DelayedWithTimeout delayed) { - queue.add(delayed); - } - - public void remove(final DelayedWithTimeout delayed) { - queue.remove(delayed); - } - - public void sendStopSignal() { - queue.add(DelayedUtil.DELAYED_POISON); - } - - public void awaitTermination() { - try { - final long startTime = EnvironmentEdgeManager.currentTime(); - for (int i = 0; isAlive(); ++i) { - sendStopSignal(); - join(250); - if (i > 0 && (i % 8) == 0) { - LOG.warn("Waiting termination of thread " + getName() + ", " + - StringUtils.humanTimeDiff(EnvironmentEdgeManager.currentTime() - startTime)); - } - } - } catch (InterruptedException e) { - LOG.warn(getName() + " join wait got interrupted", e); - } - } - } - - // ============================================================================================ - // Internals Helpers - // ============================================================================================ - - /** - * Node that contains a set of RemoteProcedures - */ - protected final class BufferNode extends DelayedContainerWithTimestamp<TRemote> - implements RemoteNode<TEnv, TRemote> { - private Set<RemoteProcedure> operations; - - protected BufferNode(final TRemote key) { - super(key, 0); - } - - public TRemote getKey() { - return getObject(); - } - - public synchronized void add(final RemoteProcedure operation) { - if (this.operations == null) { - this.operations = new HashSet<>(); - setTimeout(EnvironmentEdgeManager.currentTime() + operationDelay); - timeoutExecutor.add(this); - } - this.operations.add(operation); - if (this.operations.size() > queueMaxSize) { - timeoutExecutor.remove(this); - dispatch(); - } - } - - public synchronized void dispatch() { - if (operations != null) { - remoteDispatch(getKey(), operations); - this.operations = null; - } - } - - public synchronized void abortOperationsInQueue() { - if (operations != null) { - abortPendingOperations(getKey(), operations); - this.operations = null; - } - } - - @Override - public String toString() { - return super.toString() + ", operations=" + this.operations; - } - } - - /** - * Delayed object that holds a FutureTask. - * used to submit something later to the thread-pool. - */ - private static final class DelayedTask extends DelayedContainerWithTimestamp<FutureTask<Void>> { - public DelayedTask(final FutureTask<Void> task, final long delay, final TimeUnit unit) { - super(task, EnvironmentEdgeManager.currentTime() + unit.toMillis(delay)); - } - }; -} http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java index 64bb278..1a84070 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/SequentialProcedure.java @@ -27,13 +27,12 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.SequentialProcedureData; /** - * A SequentialProcedure describes one step in a procedure chain: - * <pre> + * A SequentialProcedure describes one step in a procedure chain. * -> Step 1 -> Step 2 -> Step 3 - * </pre> + * * The main difference from a base Procedure is that the execute() of a - * SequentialProcedure will be called only once; there will be no second - * execute() call once the children are finished. which means once the child + * SequentialProcedure will be called only once, there will be no second + * execute() call once the child are finished. which means once the child * of a SequentialProcedure are completed the SequentialProcedure is completed too. */ @InterfaceAudience.Private http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java index becd9b7..0590a93 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/StateMachineProcedure.java @@ -21,10 +21,9 @@ package org.apache.hadoop.hbase.procedure2; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.ArrayList; import java.util.Arrays; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,7 +34,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.StateMa /** * Procedure described by a series of steps. * - * <p>The procedure implementor must have an enum of 'states', describing + * The procedure implementor must have an enum of 'states', describing * the various step of the procedure. * Once the procedure is running, the procedure-framework will call executeFromState() * using the 'state' provided by the user. The first call to executeFromState() @@ -57,7 +56,7 @@ public abstract class StateMachineProcedure<TEnvironment, TState> private int stateCount = 0; private int[] states = null; - private List<Procedure<TEnvironment>> subProcList = null; + private ArrayList<Procedure> subProcList = null; protected enum Flow { HAS_MORE_STATE, @@ -71,7 +70,7 @@ public abstract class StateMachineProcedure<TEnvironment, TState> * Flow.HAS_MORE_STATE if there is another step. */ protected abstract Flow executeFromState(TEnvironment env, TState state) - throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException; + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException; /** * called to perform the rollback of the specified state @@ -126,15 +125,12 @@ public abstract class StateMachineProcedure<TEnvironment, TState> * Add a child procedure to execute * @param subProcedure the child procedure */ - protected void addChildProcedure(Procedure<TEnvironment>... subProcedure) { - if (subProcedure == null) return; - final int len = subProcedure.length; - if (len == 0) return; + protected void addChildProcedure(Procedure... subProcedure) { if (subProcList == null) { - subProcList = new ArrayList<>(len); + subProcList = new ArrayList<>(subProcedure.length); } - for (int i = 0; i < len; ++i) { - Procedure<TEnvironment> proc = subProcedure[i]; + for (int i = 0; i < subProcedure.length; ++i) { + Procedure proc = subProcedure[i]; if (!proc.hasOwner()) proc.setOwner(getOwner()); subProcList.add(proc); } @@ -142,23 +138,27 @@ public abstract class StateMachineProcedure<TEnvironment, TState> @Override protected Procedure[] execute(final TEnvironment env) - throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { + throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException { updateTimestamp(); try { failIfAborted(); if (!hasMoreState() || isFailed()) return null; + TState state = getCurrentState(); if (stateCount == 0) { setNextState(getStateId(state)); } + stateFlow = executeFromState(env, state); if (!hasMoreState()) setNextState(EOF_STATE); - if (subProcList != null && !subProcList.isEmpty()) { + + if (subProcList != null && subProcList.size() != 0) { Procedure[] subProcedures = subProcList.toArray(new Procedure[subProcList.size()]); subProcList = null; return subProcedures; } + return (isWaiting() || isFailed() || !hasMoreState()) ? null : new Procedure[] {this}; } finally { updateTimestamp(); http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java index 9e53f42..c03e326 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/NoopProcedureStore.java @@ -52,8 +52,8 @@ public class NoopProcedureStore extends ProcedureStoreBase { } @Override - public int setRunningProcedureCount(final int count) { - return count; + public void setRunningProcedureCount(final int count) { + // no-op } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java index a690c81..385cedb 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/ProcedureStore.java @@ -153,9 +153,8 @@ public interface ProcedureStore { /** * Set the number of procedure running. * This can be used, for example, by the store to know how long to wait before a sync. - * @return how many procedures are running (may not be same as <code>count</code>). */ - int setRunningProcedureCount(int count); + void setRunningProcedureCount(int count); /** * Acquire the lease for the procedure store. http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java index 95a1ef6..012ddeb 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFile.java @@ -155,23 +155,9 @@ public class ProcedureWALFile implements Comparable<ProcedureWALFile> { this.logSize += size; } - public void removeFile(final Path walArchiveDir) throws IOException { + public void removeFile() throws IOException { close(); - boolean archived = false; - if (walArchiveDir != null) { - Path archivedFile = new Path(walArchiveDir, logFile.getName()); - LOG.info("ARCHIVED (TODO: FILES ARE NOT PURGED FROM ARCHIVE!) " + logFile + " to " + archivedFile); - if (!fs.rename(logFile, archivedFile)) { - LOG.warn("Failed archive of " + logFile + ", deleting"); - } else { - archived = true; - } - } - if (!archived) { - if (!fs.delete(logFile, false)) { - LOG.warn("Failed delete of " + logFile); - } - } + fs.delete(logFile, false); } public void setProcIds(long minId, long maxId) { http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java index 0a05e6e..c672045 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/ProcedureWALFormatReader.java @@ -83,11 +83,11 @@ public class ProcedureWALFormatReader { // // Fast Start: INIT/INSERT record and StackIDs // --------------------------------------------- - // We have two special records, INIT and INSERT, that track the first time - // the procedure was added to the WAL. We can use this information to be able - // to start procedures before reaching the end of the WAL, or before reading all WALs. - // But in some cases, the WAL with that record can be already gone. - // As an alternative, we can use the stackIds on each procedure, + // We have two special record, INIT and INSERT that tracks the first time + // the procedure was added to the WAL. We can use that information to be able + // to start procedures before reaching the end of the WAL, or before reading all the WALs. + // but in some cases the WAL with that record can be already gone. + // In alternative we can use the stackIds on each procedure, // to identify when a procedure is ready to start. // If there are gaps in the sum of the stackIds we need to read more WALs. // @@ -107,16 +107,16 @@ public class ProcedureWALFormatReader { * Global tracker that will be used by the WALProcedureStore after load. * If the last WAL was closed cleanly we already have a full tracker ready to be used. * If the last WAL was truncated (e.g. master killed) the tracker will be empty - * and the 'partial' flag will be set. In this case, on WAL replay we are going + * and the 'partial' flag will be set. In this case on WAL replay we are going * to rebuild the tracker. */ private final ProcedureStoreTracker tracker; - // TODO: private final boolean hasFastStartSupport; + // private final boolean hasFastStartSupport; /** * If tracker for a log file is partial (see {@link ProcedureStoreTracker#partial}), we * re-build the list of procedures updated in that WAL because we need it for log cleaning - * purposes. If all procedures updated in a WAL are found to be obsolete, it can be safely deleted. + * purpose. If all procedures updated in a WAL are found to be obsolete, it can be safely deleted. * (see {@link WALProcedureStore#removeInactiveLogs()}). * However, we don't need deleted part of a WAL's tracker for this purpose, so we don't bother * re-building it. @@ -137,7 +137,7 @@ public class ProcedureWALFormatReader { public void read(final ProcedureWALFile log) throws IOException { localTracker = log.getTracker().isPartial() ? log.getTracker() : null; if (localTracker != null) { - LOG.info("Rebuilding tracker for " + log); + LOG.info("Rebuilding tracker for log - " + log); } FSDataInputStream stream = log.getStream(); @@ -146,7 +146,7 @@ public class ProcedureWALFormatReader { while (hasMore) { ProcedureWALEntry entry = ProcedureWALFormat.readEntry(stream); if (entry == null) { - LOG.warn("Nothing left to decode. Exiting with missing EOF, log=" + log); + LOG.warn("nothing left to decode. exiting with missing EOF"); break; } switch (entry.getType()) { @@ -171,7 +171,7 @@ public class ProcedureWALFormatReader { } } } catch (InvalidProtocolBufferException e) { - LOG.error("While reading procedure from " + log, e); + LOG.error("got an exception while reading the procedure WAL: " + log, e); loader.markCorruptedWAL(log, e); } @@ -211,7 +211,7 @@ public class ProcedureWALFormatReader { maxProcId = Math.max(maxProcId, proc.getProcId()); if (isRequired(proc.getProcId())) { if (LOG.isTraceEnabled()) { - LOG.trace("Read " + entry.getType() + " entry " + proc.getProcId()); + LOG.trace("read " + entry.getType() + " entry " + proc.getProcId()); } localProcedureMap.add(proc); if (tracker.isPartial()) { @@ -296,7 +296,7 @@ public class ProcedureWALFormatReader { // replayOrderHead = C <-> B <-> E <-> D <-> A <-> G // // We also have a lazy grouping by "root procedure", and a list of - // unlinked procedures. If after reading all the WALs we have unlinked + // unlinked procedure. If after reading all the WALs we have unlinked // procedures it means that we had a missing WAL or a corruption. // rootHead = A <-> D <-> G // B E @@ -639,17 +639,17 @@ public class ProcedureWALFormatReader { * "ready" means that we all the information that we need in-memory. * * Example-1: - * We have two WALs, we start reading from the newest (wal-2) + * We have two WALs, we start reading fronm the newest (wal-2) * wal-2 | C B | * wal-1 | A B C | * * If C and B don't depend on A (A is not the parent), we can start them - * before reading wal-1. If B is the only one with parent A we can start C. - * We have to read one more WAL before being able to start B. + * before reading wal-1. If B is the only one with parent A we can start C + * and read one more WAL before being able to start B. * * How do we know with the only information in B that we are not ready. * - easy case, the parent is missing from the global map - * - more complex case we look at the Stack IDs. + * - more complex case we look at the Stack IDs * * The Stack-IDs are added to the procedure order as incremental index * tracking how many times that procedure was executed, which is equivalent @@ -664,7 +664,7 @@ public class ProcedureWALFormatReader { * executed before. * To identify when a Procedure is ready we do the sum of the stackIds of * the procedure and the parent. if the stackIdSum is equals to the - * sum of {1..maxStackId} then everything we need is available. + * sum of {1..maxStackId} then everything we need is avaiable. * * Example-2 * wal-2 | A | A stackIds = [0, 2] @@ -676,7 +676,7 @@ public class ProcedureWALFormatReader { assert !rootEntry.hasParent() : "expected root procedure, got " + rootEntry; if (rootEntry.isFinished()) { - // If the root procedure is finished, sub-procedures should be gone + // if the root procedure is finished, sub-procedures should be gone if (rootEntry.childHead != null) { LOG.error("unexpected active children for root-procedure: " + rootEntry); for (Entry p = rootEntry.childHead; p != null; p = p.linkNext) { http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java index 1791cae..4712c30 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/store/wal/WALProcedureStore.java @@ -66,7 +66,6 @@ import com.google.common.annotations.VisibleForTesting; @InterfaceStability.Evolving public class WALProcedureStore extends ProcedureStoreBase { private static final Log LOG = LogFactory.getLog(WALProcedureStore.class); - public static final String LOG_PREFIX = "pv2-"; public interface LeaseRecovery { void recoverFileLease(FileSystem fs, Path path) throws IOException; @@ -125,7 +124,6 @@ public class WALProcedureStore extends ProcedureStoreBase { private final Configuration conf; private final FileSystem fs; private final Path walDir; - private final Path walArchiveDir; private final AtomicReference<Throwable> syncException = new AtomicReference<>(); private final AtomicBoolean loading = new AtomicBoolean(true); @@ -187,15 +185,9 @@ public class WALProcedureStore extends ProcedureStoreBase { public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir, final LeaseRecovery leaseRecovery) { - this(conf, fs, walDir, null, leaseRecovery); - } - - public WALProcedureStore(final Configuration conf, final FileSystem fs, final Path walDir, - final Path walArchiveDir, final LeaseRecovery leaseRecovery) { this.fs = fs; this.conf = conf; this.walDir = walDir; - this.walArchiveDir = walArchiveDir; this.leaseRecovery = leaseRecovery; } @@ -247,16 +239,6 @@ public class WALProcedureStore extends ProcedureStoreBase { } }; syncThread.start(); - - // Create archive dir up front. Rename won't work w/o it up on HDFS. - if (this.walArchiveDir != null && !this.fs.exists(this.walArchiveDir)) { - if (this.fs.mkdirs(this.walArchiveDir)) { - if (LOG.isDebugEnabled()) LOG.debug("Created Procedure Store WAL archive dir " + - this.walArchiveDir); - } else { - LOG.warn("Failed create of " + this.walArchiveDir); - } - } } @Override @@ -310,9 +292,9 @@ public class WALProcedureStore extends ProcedureStoreBase { } @Override - public int setRunningProcedureCount(final int count) { + public void setRunningProcedureCount(final int count) { + LOG.debug("Set running procedure count=" + count + ", slots=" + slots.length); this.runningProcCount = count > 0 ? Math.min(count, slots.length) : slots.length; - return this.runningProcCount; } public ProcedureStoreTracker getStoreTracker() { @@ -361,7 +343,7 @@ public class WALProcedureStore extends ProcedureStoreBase { if (LOG.isDebugEnabled()) { LOG.debug("Someone else created new logs. Expected maxLogId < " + flushLogId); } - logs.getLast().removeFile(this.walArchiveDir); + logs.getLast().removeFile(); continue; } @@ -973,7 +955,7 @@ public class WALProcedureStore extends ProcedureStoreBase { // but we should check if someone else has created new files if (getMaxLogId(getLogFiles()) > flushLogId) { LOG.warn("Someone else created new logs. Expected maxLogId < " + flushLogId); - logs.getLast().removeFile(this.walArchiveDir); + logs.getLast().removeFile(); return false; } @@ -1065,7 +1047,7 @@ public class WALProcedureStore extends ProcedureStoreBase { // We keep track of which procedures are holding the oldest WAL in 'holdingCleanupTracker'. // once there is nothing olding the oldest WAL we can remove it. while (logs.size() > 1 && holdingCleanupTracker.isEmpty()) { - removeLogFile(logs.getFirst(), walArchiveDir); + removeLogFile(logs.getFirst()); buildHoldingCleanupTracker(); } @@ -1097,8 +1079,8 @@ public class WALProcedureStore extends ProcedureStoreBase { private void removeAllLogs(long lastLogId) { if (logs.size() <= 1) return; - if (LOG.isTraceEnabled()) { - LOG.trace("Remove all state logs with ID less than " + lastLogId); + if (LOG.isDebugEnabled()) { + LOG.debug("Remove all state logs with ID less than " + lastLogId); } boolean removed = false; @@ -1107,7 +1089,7 @@ public class WALProcedureStore extends ProcedureStoreBase { if (lastLogId < log.getLogId()) { break; } - removeLogFile(log, walArchiveDir); + removeLogFile(log); removed = true; } @@ -1116,15 +1098,15 @@ public class WALProcedureStore extends ProcedureStoreBase { } } - private boolean removeLogFile(final ProcedureWALFile log, final Path walArchiveDir) { + private boolean removeLogFile(final ProcedureWALFile log) { try { if (LOG.isTraceEnabled()) { LOG.trace("Removing log=" + log); } - log.removeFile(walArchiveDir); + log.removeFile(); logs.remove(log); if (LOG.isDebugEnabled()) { - LOG.info("Removed log=" + log + ", activeLogs=" + logs); + LOG.info("Removed log=" + log + " activeLogs=" + logs); } assert logs.size() > 0 : "expected at least one log"; } catch (IOException e) { @@ -1146,7 +1128,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } protected Path getLogFilePath(final long logId) throws IOException { - return new Path(walDir, String.format(LOG_PREFIX + "%020d.log", logId)); + return new Path(walDir, String.format("state-%020d.log", logId)); } private static long getLogIdFromName(final String name) { @@ -1159,7 +1141,7 @@ public class WALProcedureStore extends ProcedureStoreBase { @Override public boolean accept(Path path) { String name = path.getName(); - return name.startsWith(LOG_PREFIX) && name.endsWith(".log"); + return name.startsWith("state-") && name.endsWith(".log"); } }; @@ -1210,7 +1192,7 @@ public class WALProcedureStore extends ProcedureStoreBase { } maxLogId = Math.max(maxLogId, getLogIdFromName(logPath.getName())); - ProcedureWALFile log = initOldLog(logFiles[i], this.walArchiveDir); + ProcedureWALFile log = initOldLog(logFiles[i]); if (log != null) { this.logs.add(log); } @@ -1240,22 +1222,21 @@ public class WALProcedureStore extends ProcedureStoreBase { /** * Loads given log file and it's tracker. */ - private ProcedureWALFile initOldLog(final FileStatus logFile, final Path walArchiveDir) - throws IOException { + private ProcedureWALFile initOldLog(final FileStatus logFile) throws IOException { final ProcedureWALFile log = new ProcedureWALFile(fs, logFile); if (logFile.getLen() == 0) { LOG.warn("Remove uninitialized log: " + logFile); - log.removeFile(walArchiveDir); + log.removeFile(); return null; } if (LOG.isDebugEnabled()) { - LOG.debug("Opening Pv2 " + logFile); + LOG.debug("Opening state-log: " + logFile); } try { log.open(); } catch (ProcedureWALFormat.InvalidWALDataException e) { LOG.warn("Remove uninitialized log: " + logFile, e); - log.removeFile(walArchiveDir); + log.removeFile(); return null; } catch (IOException e) { String msg = "Unable to read state log: " + logFile; http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java index faf8e7e..cde37bd 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -// FIX namings. TODO. @InterfaceAudience.Private @InterfaceStability.Evolving public final class DelayedUtil { @@ -149,9 +148,6 @@ public final class DelayedUtil { } } - /** - * Has a timeout. - */ public static class DelayedContainerWithTimestamp<T> extends DelayedContainer<T> { private long timeout; @@ -169,4 +165,4 @@ public final class DelayedUtil { this.timeout = timeout; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/a3c5a744/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java index 78daf5a..408cffd 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureToString.java @@ -42,7 +42,7 @@ public class TestProcedureToString { */ static class BasicProcedure extends Procedure<BasicProcedureEnv> { @Override - protected Procedure<BasicProcedureEnv>[] execute(BasicProcedureEnv env) + protected Procedure<?>[] execute(BasicProcedureEnv env) throws ProcedureYieldException, InterruptedException { return new Procedure [] {this}; } @@ -78,6 +78,8 @@ public class TestProcedureToString { } } + + /** * Test that I can override the toString for its state value. * @throws ProcedureYieldException