HBASE-13202 Procedure v2 - core framework
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d71f54f8 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d71f54f8 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d71f54f8 Branch: refs/heads/hbase-12439 Commit: d71f54f83169dfd6a1fbce411a65b529052c492a Parents: 5d2c331 Author: Matteo Bertozzi <matteo.berto...@cloudera.com> Authored: Wed Mar 25 18:26:20 2015 +0000 Committer: Matteo Bertozzi <matteo.berto...@cloudera.com> Committed: Fri Mar 27 13:58:44 2015 +0000 ---------------------------------------------------------------------- .../hadoop/hbase/io/util/StreamUtils.java | 12 +- .../hadoop/hbase/util/ForeignExceptionUtil.java | 105 + hbase-procedure/pom.xml | 200 + .../hbase/procedure2/OnePhaseProcedure.java | 28 + .../hadoop/hbase/procedure2/Procedure.java | 659 ++ .../procedure2/ProcedureAbortedException.java | 42 + .../hbase/procedure2/ProcedureException.java | 45 + .../hbase/procedure2/ProcedureExecutor.java | 1004 +++ .../procedure2/ProcedureFairRunQueues.java | 146 + .../hbase/procedure2/ProcedureResult.java | 95 + .../hbase/procedure2/ProcedureRunnableSet.java | 71 + .../procedure2/ProcedureSimpleRunQueue.java | 116 + .../procedure2/ProcedureYieldException.java | 40 + .../procedure2/RemoteProcedureException.java | 116 + .../hbase/procedure2/RootProcedureState.java | 177 + .../hbase/procedure2/SequentialProcedure.java | 80 + .../hbase/procedure2/StateMachineProcedure.java | 140 + .../hbase/procedure2/TwoPhaseProcedure.java | 28 + .../hbase/procedure2/store/ProcedureStore.java | 116 + .../procedure2/store/ProcedureStoreTracker.java | 540 ++ .../CorruptedWALProcedureStoreException.java | 43 + .../procedure2/store/wal/ProcedureWALFile.java | 152 + .../store/wal/ProcedureWALFormat.java | 234 + .../store/wal/ProcedureWALFormatReader.java | 166 + .../procedure2/store/wal/WALProcedureStore.java | 714 ++ .../hadoop/hbase/procedure2/util/ByteSlot.java | 111 + .../hbase/procedure2/util/StringUtils.java | 80 + .../procedure2/util/TimeoutBlockingQueue.java | 216 + .../procedure2/ProcedureTestingUtility.java | 130 + .../procedure2/TestProcedureExecution.java | 303 + .../hbase/procedure2/TestProcedureRecovery.java | 488 ++ .../procedure2/TestProcedureRunQueues.java | 72 + .../store/TestProcedureStoreTracker.java | 168 + .../store/wal/TestWALProcedureStore.java | 267 + .../util/TestTimeoutBlockingQueue.java | 137 + hbase-protocol/pom.xml | 1 + .../protobuf/generated/ProcedureProtos.java | 7219 ++++++++++++++++++ .../src/main/protobuf/Procedure.proto | 114 + pom.xml | 21 +- 39 files changed, 14391 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java index 314ed2b..0b442a5 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/util/StreamUtils.java @@ -120,7 +120,7 @@ public class StreamUtils { /** * Reads a varInt value stored in an array. - * + * * @param input * Input array where the varInt is available * @param offset @@ -198,4 +198,14 @@ public class StreamUtils { out.write((byte) (0xff & (v >> 8))); out.write((byte) (0xff & v)); } + + public static long readLong(InputStream in) throws IOException { + long result = 0; + for (int shift = 56; shift >= 0; shift -= 8) { + long x = in.read(); + if (x < 0) throw new IOException("EOF"); + result |= (x << shift); + } + return result; + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ForeignExceptionUtil.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ForeignExceptionUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ForeignExceptionUtil.java new file mode 100644 index 0000000..94a8c8c --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ForeignExceptionUtil.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos.ForeignExceptionMessage; +import org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos.GenericExceptionMessage; +import org.apache.hadoop.hbase.protobuf.generated.ErrorHandlingProtos.StackTraceElementMessage; + +/** + * Helper to convert Exceptions and StackTraces from/to protobuf. + * (see ErrorHandling.proto for the internal of the proto messages) + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class ForeignExceptionUtil { + private ForeignExceptionUtil() { } + + public static IOException toIOException(final ForeignExceptionMessage eem) { + GenericExceptionMessage gem = eem.getGenericException(); + StackTraceElement[] trace = toStackTrace(gem.getTraceList()); + RemoteException re = new RemoteException(gem.getClassName(), gem.getMessage()); + re.setStackTrace(trace); + return re.unwrapRemoteException(); + } + + public static ForeignExceptionMessage toProtoForeignException(String source, Throwable t) { + GenericExceptionMessage.Builder gemBuilder = GenericExceptionMessage.newBuilder(); + gemBuilder.setClassName(t.getClass().getName()); + if (t.getMessage() != null) { + gemBuilder.setMessage(t.getMessage()); + } + // set the stack trace, if there is one + List<StackTraceElementMessage> stack = toProtoStackTraceElement(t.getStackTrace()); + if (stack != null) { + gemBuilder.addAllTrace(stack); + } + GenericExceptionMessage payload = gemBuilder.build(); + ForeignExceptionMessage.Builder exception = ForeignExceptionMessage.newBuilder(); + exception.setGenericException(payload).setSource(source); + return exception.build(); + } + + /** + * Convert a stack trace to list of {@link StackTraceElement}. + * @param trace the stack trace to convert to protobuf message + * @return <tt>null</tt> if the passed stack is <tt>null</tt>. + */ + public static List<StackTraceElementMessage> toProtoStackTraceElement(StackTraceElement[] trace) { + // if there is no stack trace, ignore it and just return the message + if (trace == null) return null; + // build the stack trace for the message + List<StackTraceElementMessage> pbTrace = new ArrayList<StackTraceElementMessage>(trace.length); + for (StackTraceElement elem : trace) { + StackTraceElementMessage.Builder stackBuilder = StackTraceElementMessage.newBuilder(); + stackBuilder.setDeclaringClass(elem.getClassName()); + stackBuilder.setFileName(elem.getFileName()); + stackBuilder.setLineNumber(elem.getLineNumber()); + stackBuilder.setMethodName(elem.getMethodName()); + pbTrace.add(stackBuilder.build()); + } + return pbTrace; + } + + /** + * Unwind a serialized array of {@link StackTraceElementMessage}s to a + * {@link StackTraceElement}s. + * @param traceList list that was serialized + * @return the deserialized list or <tt>null</tt> if it couldn't be unwound (e.g. wasn't set on + * the sender). + */ + public static StackTraceElement[] toStackTrace(List<StackTraceElementMessage> traceList) { + if (traceList == null || traceList.size() == 0) { + return new StackTraceElement[0]; // empty array + } + StackTraceElement[] trace = new StackTraceElement[traceList.size()]; + for (int i = 0; i < traceList.size(); i++) { + StackTraceElementMessage elem = traceList.get(i); + trace[i] = new StackTraceElement( + elem.getDeclaringClass(), elem.getMethodName(), elem.getFileName(), elem.getLineNumber()); + } + return trace; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/pom.xml ---------------------------------------------------------------------- diff --git a/hbase-procedure/pom.xml b/hbase-procedure/pom.xml new file mode 100644 index 0000000..fce3102 --- /dev/null +++ b/hbase-procedure/pom.xml @@ -0,0 +1,200 @@ +<?xml version="1.0"?> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<!-- +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +--> + <modelVersion>4.0.0</modelVersion> + <parent> + <artifactId>hbase</artifactId> + <groupId>org.apache.hbase</groupId> + <version>2.0.0-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + + <artifactId>hbase-procedure</artifactId> + <name>HBase - Procedure</name> + <description>Procedure Framework</description> + + <build> + <plugins> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <executions> + <execution> + <id>default-compile</id> + <configuration> + <compilerId>javac-with-errorprone</compilerId> + <forceJavacCompilerUse>true</forceJavacCompilerUse> + </configuration> + </execution> + <execution> + <id>default-testCompile</id> + <configuration> + <compilerId>javac-with-errorprone</compilerId> + <forceJavacCompilerUse>true</forceJavacCompilerUse> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-site-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + <!-- Make a jar and put the sources in the jar --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-source-plugin</artifactId> + </plugin> + <plugin> + <!--Make it so assembly:single does nothing in here--> + <artifactId>maven-assembly-plugin</artifactId> + <version>${maven.assembly.version}</version> + <configuration> + <skipAssembly>true</skipAssembly> + </configuration> + </plugin> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <!-- Always skip the second part executions, since we only run + simple unit tests in this module. --> + <executions> + <execution> + <id>secondPartTestsExecution</id> + <phase>test</phase> + <goals> + <goal>test</goal> + </goals> + <configuration> + <skip>true</skip> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + + <dependencies> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-common</artifactId> + <version>${project.version}</version> + <classifier>tests</classifier> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-annotations</artifactId> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-protocol</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-common</artifactId> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + <dependency> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </dependency> + </dependencies> + + <profiles> + <!-- Profiles for building against different hadoop versions --> + <profile> + <id>hadoop-1.1</id> + <activation> + <property> + <!--Below formatting for dev-support/generate-hadoopX-poms.sh--> + <!--h1--><name>hadoop.profile</name><value>1.1</value> + </property> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + </dependency> + </dependencies> + </profile> + <profile> + <id>hadoop-1.0</id> + <activation> + <property> + <name>hadoop.profile</name> + <value>1.0</value> + </property> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + </dependency> + </dependencies> + </profile> + <!-- + profile for building against Hadoop 2.0.0-alpha. Activate using: + mvn -Dhadoop.profile=2.0 + --> + <profile> + <id>hadoop-2.0</id> + <activation> + <property> + <!--Below formatting for dev-support/generate-hadoopX-poms.sh--> + <!--h2--><name>!hadoop.profile</name> + </property> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + </dependencies> + </profile> + <!-- + profile for building against Hadoop 3.0.x. Activate using: + mvn -Dhadoop.profile=3.0 + --> + <profile> + <id>hadoop-3.0</id> + <activation> + <property> + <name>hadoop.profile</name> + <value>3.0</value> + </property> + </activation> + <properties> + <hadoop.version>3.0-SNAPSHOT</hadoop.version> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + </dependencies> + </profile> + </profiles> +</project> http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/OnePhaseProcedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/OnePhaseProcedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/OnePhaseProcedure.java new file mode 100644 index 0000000..1c3be2d --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/OnePhaseProcedure.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public abstract class OnePhaseProcedure<TEnvironment> extends Procedure<TEnvironment> { + // TODO (e.g. used by online snapshots) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java new file mode 100644 index 0000000..899fe8d --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -0,0 +1,659 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.lang.reflect.Constructor; +import java.lang.reflect.Modifier; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.procedure2.util.StringUtils; +import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos; +import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState; +import org.apache.hadoop.hbase.util.ByteStringer; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; + +/** + * Base Procedure class responsible to handle the Procedure Metadata + * e.g. state, startTime, lastUpdate, stack-indexes, ... + * + * execute() is called each time the procedure is executed. + * it may be called multiple times in case of failure and restart, so the + * code must be idempotent. + * the return is a set of sub-procedures or null in case the procedure doesn't + * have sub-procedures. Once the sub-procedures are successfully completed + * the execute() method is called again, you should think at it as a stack: + * -> step 1 + * ---> step 2 + * -> step 1 + * + * rollback() is called when the procedure or one of the sub-procedures is failed. + * the rollback step is supposed to cleanup the resources created during the + * execute() step. in case of failure and restart rollback() may be called + * multiple times, so the code must be idempotent. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { + // unchanged after initialization + private String owner = null; + private Long parentProcId = null; + private Long procId = null; + private long startTime; + + // runtime state, updated every operation + private ProcedureState state = ProcedureState.INITIALIZING; + private Integer timeout = null; + private int[] stackIndexes = null; + private int childrenLatch = 0; + private long lastUpdate; + + private RemoteProcedureException exception = null; + private byte[] result = null; + + /** + * The main code of the procedure. It must be idempotent since execute() + * may be called multiple time in case of machine failure in the middle + * of the execution. + * @return a set of sub-procedures or null if there is nothing else to execute. + */ + protected abstract Procedure[] execute(TEnvironment env) + throws ProcedureYieldException; + + /** + * The code to undo what done by the execute() code. + * It is called when the procedure or one of the sub-procedure failed or an + * abort was requested. It should cleanup all the resources created by + * the execute() call. The implementation must be idempotent since rollback() + * may be called multiple time in case of machine failure in the middle + * of the execution. + */ + protected abstract void rollback(TEnvironment env); + + /** + * The abort() call is asynchronous and each procedure must decide how to deal + * with that, if they want to be abortable. The simplest implementation + * is to have an AtomicBoolean set in the abort() method and then the execute() + * will check if the abort flag is set or not. + * abort() may be called multiple times from the client, so the implementation + * must be idempotent. + * + * NOTE: abort() is not like Thread.interrupt() it is just a notification + * that allows the procedure implementor where to abort to avoid leak and + * have a better control on what was executed and what not. + */ + protected abstract boolean abort(TEnvironment env); + + /** + * The user-level code of the procedure may have some state to + * persist (e.g. input arguments) to be able to resume on failure. + * @param stream the stream that will contain the user serialized data + */ + protected abstract void serializeStateData(final OutputStream stream) + throws IOException; + + /** + * Called on store load to allow the user to decode the previously serialized + * state. + * @param stream the stream that contains the user serialized data + */ + protected abstract void deserializeStateData(final InputStream stream) + throws IOException; + + /** + * The user should override this method, and try to take a lock if necessary. + * A lock can be anything, and it is up to the implementor. + * Example: in our Master we can execute request in parallel for different tables + * create t1 and create t2 can be executed at the same time. + * anything else on t1/t2 is queued waiting that specific table create to happen. + * + * @return true if the lock was acquired and false otherwise + */ + protected boolean acquireLock(final TEnvironment env) { + return true; + } + + /** + * The user should override this method, and release lock if necessary. + */ + protected void releaseLock(final TEnvironment env) { + // no-op + } + + /** + * Called when the procedure is marked as completed (success or rollback). + * The procedure implementor may use this method to cleanup in-memory states. + * This operation will not be retried on failure. + */ + protected void completionCleanup(final TEnvironment env) { + // no-op + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(getClass().getName()); + + if (procId != null) { + sb.append(" id="); + sb.append(getProcId()); + } + + if (hasParent()) { + sb.append(" parent="); + sb.append(getParentProcId()); + } + + if (hasOwner()) { + sb.append(" owner="); + sb.append(getOwner()); + } + + sb.append(" state="); + sb.append(getState()); + return sb.toString(); + } + + /** + * @return the serialized result if any, otherwise null + */ + public byte[] getResult() { + return result; + } + + /** + * The procedure may leave a "result" on completion. + * @param result the serialized result that will be passed to the client + */ + protected void setResult(final byte[] result) { + this.result = result; + } + + public long getProcId() { + return procId; + } + + public boolean hasParent() { + return parentProcId != null; + } + + public boolean hasException() { + return exception != null; + } + + public boolean hasTimeout() { + return timeout != null; + } + + public long getParentProcId() { + return parentProcId; + } + + /** + * @return true if the procedure has failed. + * true may mean failed but not yet rolledback or failed and rolledback. + */ + public synchronized boolean isFailed() { + return exception != null || state == ProcedureState.ROLLEDBACK; + } + + /** + * @return true if the procedure is finished successfully. + */ + public synchronized boolean isSuccess() { + return state == ProcedureState.FINISHED && exception == null; + } + + /** + * @return true if the procedure is finished. The Procedure may be completed + * successfuly or failed and rolledback. + */ + public synchronized boolean isFinished() { + switch (state) { + case ROLLEDBACK: + return true; + case FINISHED: + return exception == null; + default: + break; + } + return false; + } + + /** + * @return true if the procedure is waiting for a child to finish or for an external event. + */ + public synchronized boolean isWaiting() { + switch (state) { + case WAITING: + case WAITING_TIMEOUT: + return true; + default: + break; + } + return false; + } + + public synchronized RemoteProcedureException getException() { + return exception; + } + + public long getStartTime() { + return startTime; + } + + public synchronized long getLastUpdate() { + return lastUpdate; + } + + public synchronized long elapsedTime() { + return lastUpdate - startTime; + } + + /** + * @param timeout timeout in msec + */ + protected void setTimeout(final int timeout) { + this.timeout = timeout; + } + + /** + * @return the timeout in msec + */ + public int getTimeout() { + return timeout; + } + + /** + * @return the remaining time before the timeout + */ + public long getTimeRemaining() { + return Math.max(0, timeout - (EnvironmentEdgeManager.currentTime() - startTime)); + } + + protected void setOwner(final String owner) { + this.owner = StringUtils.isEmpty(owner) ? null : owner; + } + + public String getOwner() { + return owner; + } + + public boolean hasOwner() { + return owner != null; + } + + @VisibleForTesting + @InterfaceAudience.Private + protected synchronized void setState(final ProcedureState state) { + this.state = state; + updateTimestamp(); + } + + @InterfaceAudience.Private + protected synchronized ProcedureState getState() { + return state; + } + + protected void setFailure(final String source, final Throwable cause) { + setFailure(new RemoteProcedureException(source, cause)); + } + + protected synchronized void setFailure(final RemoteProcedureException exception) { + this.exception = exception; + if (!isFinished()) { + setState(ProcedureState.FINISHED); + } + } + + protected void setAbortFailure(final String source, final String msg) { + setFailure(source, new ProcedureAbortedException(msg)); + } + + @InterfaceAudience.Private + protected synchronized boolean setTimeoutFailure() { + if (state == ProcedureState.WAITING_TIMEOUT) { + long timeDiff = EnvironmentEdgeManager.currentTime() - lastUpdate; + setFailure("ProcedureExecutor", new TimeoutException( + "Operation timed out after " + StringUtils.humanTimeDiff(timeDiff))); + return true; + } + return false; + } + + /** + * Called by the ProcedureExecutor to assign the ID to the newly created procedure. + */ + @VisibleForTesting + @InterfaceAudience.Private + protected void setProcId(final long procId) { + this.procId = procId; + this.startTime = EnvironmentEdgeManager.currentTime(); + setState(ProcedureState.RUNNABLE); + } + + /** + * Called by the ProcedureExecutor to assign the parent to the newly created procedure. + */ + @InterfaceAudience.Private + protected void setParentProcId(final long parentProcId) { + this.parentProcId = parentProcId; + } + + /** + * Internal method called by the ProcedureExecutor that starts the + * user-level code execute(). + */ + @InterfaceAudience.Private + protected Procedure[] doExecute(final TEnvironment env) + throws ProcedureYieldException { + try { + updateTimestamp(); + return execute(env); + } finally { + updateTimestamp(); + } + } + + /** + * Internal method called by the ProcedureExecutor that starts the + * user-level code rollback(). + */ + @InterfaceAudience.Private + protected void doRollback(final TEnvironment env) { + try { + updateTimestamp(); + rollback(env); + } finally { + updateTimestamp(); + } + } + + /** + * Called on store load to initialize the Procedure internals after + * the creation/deserialization. + */ + @InterfaceAudience.Private + protected void setStartTime(final long startTime) { + this.startTime = startTime; + } + + /** + * Called on store load to initialize the Procedure internals after + * the creation/deserialization. + */ + private synchronized void setLastUpdate(final long lastUpdate) { + this.lastUpdate = lastUpdate; + } + + protected synchronized void updateTimestamp() { + this.lastUpdate = EnvironmentEdgeManager.currentTime(); + } + + /** + * Called by the ProcedureExecutor on procedure-load to restore the latch state + */ + @InterfaceAudience.Private + protected synchronized void setChildrenLatch(final int numChildren) { + this.childrenLatch = numChildren; + } + + /** + * Called by the ProcedureExecutor on procedure-load to restore the latch state + */ + @InterfaceAudience.Private + protected synchronized void incChildrenLatch() { + // TODO: can this be inferred from the stack? I think so... + this.childrenLatch++; + } + + /** + * Called by the ProcedureExecutor to notify that one of the sub-procedures + * has completed. + */ + @InterfaceAudience.Private + protected synchronized boolean childrenCountDown() { + assert childrenLatch > 0; + return --childrenLatch == 0; + } + + /** + * Called by the RootProcedureState on procedure execution. + * Each procedure store its stack-index positions. + */ + @InterfaceAudience.Private + protected void addStackIndex(final int index) { + if (stackIndexes == null) { + stackIndexes = new int[] { index }; + } else { + int count = stackIndexes.length; + stackIndexes = Arrays.copyOf(stackIndexes, count + 1); + stackIndexes[count] = index; + } + } + + @InterfaceAudience.Private + protected boolean removeStackIndex() { + if (stackIndexes.length > 1) { + stackIndexes = Arrays.copyOf(stackIndexes, stackIndexes.length - 1); + return false; + } else { + stackIndexes = null; + return true; + } + } + + /** + * Called on store load to initialize the Procedure internals after + * the creation/deserialization. + */ + @InterfaceAudience.Private + protected void setStackIndexes(final List<Integer> stackIndexes) { + this.stackIndexes = new int[stackIndexes.size()]; + for (int i = 0; i < this.stackIndexes.length; ++i) { + this.stackIndexes[i] = stackIndexes.get(i); + } + } + + @InterfaceAudience.Private + protected boolean wasExecuted() { + return stackIndexes != null; + } + + @InterfaceAudience.Private + protected int[] getStackIndexes() { + return stackIndexes; + } + + @Override + public int compareTo(final Procedure other) { + long diff = getProcId() - other.getProcId(); + return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; + } + + /* + * Helper to lookup the root Procedure ID given a specified procedure. + */ + @InterfaceAudience.Private + protected static Long getRootProcedureId(final Map<Long, Procedure> procedures, Procedure proc) { + while (proc.hasParent()) { + proc = procedures.get(proc.getParentProcId()); + if (proc == null) return null; + } + return proc.getProcId(); + } + + protected static Procedure newInstance(final String className) throws IOException { + try { + Class<?> clazz = Class.forName(className); + if (!Modifier.isPublic(clazz.getModifiers())) { + throw new Exception("the " + clazz + " class is not public"); + } + + Constructor<?> ctor = clazz.getConstructor(); + assert ctor != null : "no constructor found"; + if (!Modifier.isPublic(ctor.getModifiers())) { + throw new Exception("the " + clazz + " constructor is not public"); + } + return (Procedure)ctor.newInstance(); + } catch (Exception e) { + throw new IOException("The procedure class " + className + + " must be accessible and have an empty constructor", e); + } + } + + protected static void validateClass(final Procedure proc) throws IOException { + try { + Class<?> clazz = proc.getClass(); + if (!Modifier.isPublic(clazz.getModifiers())) { + throw new Exception("the " + clazz + " class is not public"); + } + + Constructor<?> ctor = clazz.getConstructor(); + assert ctor != null; + if (!Modifier.isPublic(ctor.getModifiers())) { + throw new Exception("the " + clazz + " constructor is not public"); + } + } catch (Exception e) { + throw new IOException("The procedure class " + proc.getClass().getName() + + " must be accessible and have an empty constructor", e); + } + } + + /** + * Helper to convert the procedure to protobuf. + * Used by ProcedureStore implementations. + */ + @InterfaceAudience.Private + public static ProcedureProtos.Procedure convert(final Procedure proc) + throws IOException { + Preconditions.checkArgument(proc != null); + validateClass(proc); + + ProcedureProtos.Procedure.Builder builder = ProcedureProtos.Procedure.newBuilder() + .setClassName(proc.getClass().getName()) + .setProcId(proc.getProcId()) + .setState(proc.getState()) + .setStartTime(proc.getStartTime()) + .setLastUpdate(proc.getLastUpdate()); + + if (proc.hasParent()) { + builder.setParentId(proc.getParentProcId()); + } + + if (proc.hasTimeout()) { + builder.setTimeout(proc.getTimeout()); + } + + if (proc.hasOwner()) { + builder.setOwner(proc.getOwner()); + } + + int[] stackIds = proc.getStackIndexes(); + if (stackIds != null) { + for (int i = 0; i < stackIds.length; ++i) { + builder.addStackId(stackIds[i]); + } + } + + if (proc.hasException()) { + RemoteProcedureException exception = proc.getException(); + builder.setException( + RemoteProcedureException.toProto(exception.getSource(), exception.getCause())); + } + + byte[] result = proc.getResult(); + if (result != null) { + builder.setResult(ByteStringer.wrap(result)); + } + + ByteString.Output stateStream = ByteString.newOutput(); + proc.serializeStateData(stateStream); + if (stateStream.size() > 0) { + builder.setStateData(stateStream.toByteString()); + } + + return builder.build(); + } + + /** + * Helper to convert the protobuf procedure. + * Used by ProcedureStore implementations. + * + * TODO: OPTIMIZATION: some of the field never change during the execution + * (e.g. className, procId, parentId, ...). + * We can split in 'data' and 'state', and the store + * may take advantage of it by storing the data only on insert(). + */ + @InterfaceAudience.Private + public static Procedure convert(final ProcedureProtos.Procedure proto) + throws IOException { + // Procedure from class name + Procedure proc = Procedure.newInstance(proto.getClassName()); + + // set fields + proc.setProcId(proto.getProcId()); + proc.setState(proto.getState()); + proc.setStartTime(proto.getStartTime()); + proc.setLastUpdate(proto.getLastUpdate()); + + if (proto.hasParentId()) { + proc.setParentProcId(proto.getParentId()); + } + + if (proto.hasOwner()) { + proc.setOwner(proto.getOwner()); + } + + if (proto.hasTimeout()) { + proc.setTimeout(proto.getTimeout()); + } + + if (proto.getStackIdCount() > 0) { + proc.setStackIndexes(proto.getStackIdList()); + } + + if (proto.hasException()) { + assert proc.getState() == ProcedureState.FINISHED || + proc.getState() == ProcedureState.ROLLEDBACK : + "The procedure must be failed (waiting to rollback) or rolledback"; + proc.setFailure(RemoteProcedureException.fromProto(proto.getException())); + } + + if (proto.hasResult()) { + proc.setResult(proto.getResult().toByteArray()); + } + + // we want to call deserialize even when the stream is empty, mainly for testing. + proc.deserializeStateData(proto.getStateData().newInput()); + + return proc; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureAbortedException.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureAbortedException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureAbortedException.java new file mode 100644 index 0000000..2e409cf --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureAbortedException.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Thrown when a procedure is aborted + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class ProcedureAbortedException extends ProcedureException { + /** default constructor */ + public ProcedureAbortedException() { + super(); + } + + /** + * Constructor + * @param s message + */ + public ProcedureAbortedException(String s) { + super(s); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureException.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureException.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureException.java new file mode 100644 index 0000000..9f922b1 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureException.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.io.IOException; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +@InterfaceAudience.Public +@InterfaceStability.Stable +public class ProcedureException extends IOException { + /** default constructor */ + public ProcedureException() { + super(); + } + + /** + * Constructor + * @param s message + */ + public ProcedureException(String s) { + super(s); + } + + public ProcedureException(Throwable t) { + super(t); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java new file mode 100644 index 0000000..86ad57e --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -0,0 +1,1004 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.HashSet; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; +import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue; +import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever; +import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.procedure2.util.StringUtils; + +import com.google.common.base.Preconditions; + +/** + * Thread Pool that executes the submitted procedures. + * The executor has a ProcedureStore associated. + * Each operation is logged and on restart the pending procedures are resumed. + * + * Unless the Procedure code throws an error (e.g. invalid user input) + * the procedure will complete (at some point in time), On restart the pending + * procedures are resumed and the once failed will be rolledback. + * + * The user can add procedures to the executor via submitProcedure(proc) + * check for the finished state via isFinished(procId) + * and get the result via getResult(procId) + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ProcedureExecutor<TEnvironment> { + private static final Log LOG = LogFactory.getLog(ProcedureExecutor.class); + + Testing testing = null; + public static class Testing { + protected boolean killBeforeStoreUpdate = false; + protected boolean toggleKillBeforeStoreUpdate = false; + + protected boolean shouldKillBeforeStoreUpdate() { + final boolean kill = this.killBeforeStoreUpdate; + if (this.toggleKillBeforeStoreUpdate) { + this.killBeforeStoreUpdate = !kill; + LOG.warn("Toggle Kill before store update to: " + this.killBeforeStoreUpdate); + } + return kill; + } + } + + public interface ProcedureExecutorListener { + void procedureLoaded(long procId); + void procedureAdded(long procId); + void procedureFinished(long procId); + } + + /** + * Used by the TimeoutBlockingQueue to get the timeout interval of the procedure + */ + private static class ProcedureTimeoutRetriever implements TimeoutRetriever<Procedure> { + @Override + public long getTimeout(Procedure proc) { + return proc.getTimeRemaining(); + } + + @Override + public TimeUnit getTimeUnit(Procedure proc) { + return TimeUnit.MILLISECONDS; + } + } + + /** + * Internal cleaner that removes the completed procedure results after a TTL. + * NOTE: This is a special case handled in timeoutLoop(). + * + * Since the client code looks more or less like: + * procId = master.doOperation() + * while (master.getProcResult(procId) == ProcInProgress); + * The master should not throw away the proc result as soon as the procedure is done + * but should wait a result request from the client (see executor.removeResult(procId)) + * The client will call something like master.isProcDone() or master.getProcResult() + * which will return the result/state to the client, and it will mark the completed + * proc as ready to delete. note that the client may not receive the response from + * the master (e.g. master failover) so, if we delay a bit the real deletion of + * the proc result the client will be able to get the result the next try. + */ + private static class CompletedProcedureCleaner<TEnvironment> extends Procedure<TEnvironment> { + private static final Log LOG = LogFactory.getLog(CompletedProcedureCleaner.class); + + private static final String CLEANER_INTERVAL_CONF_KEY = "hbase.procedure.cleaner.interval"; + private static final int DEFAULT_CLEANER_INTERVAL = 30 * 1000; // 30sec + + private static final String EVICT_TTL_CONF_KEY = "hbase.procedure.cleaner.evict.ttl"; + private static final int DEFAULT_EVICT_TTL = 15 * 60000; // 15min + + private static final String EVICT_ACKED_TTL_CONF_KEY ="hbase.procedure.cleaner.acked.evict.ttl"; + private static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min + + private final Map<Long, ProcedureResult> completed; + private final ProcedureStore store; + private final Configuration conf; + + public CompletedProcedureCleaner(final Configuration conf, final ProcedureStore store, + final Map<Long, ProcedureResult> completedMap) { + // set the timeout interval that triggers the periodic-procedure + setTimeout(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL)); + this.completed = completedMap; + this.store = store; + this.conf = conf; + } + + public void periodicExecute(final TEnvironment env) { + if (completed.isEmpty()) { + LOG.debug("no completed procedures to cleanup"); + return; + } + + final long evictTtl = conf.getInt(EVICT_TTL_CONF_KEY, DEFAULT_EVICT_TTL); + final long evictAckTtl = conf.getInt(EVICT_ACKED_TTL_CONF_KEY, DEFAULT_ACKED_EVICT_TTL); + + long now = EnvironmentEdgeManager.currentTime(); + Iterator<Map.Entry<Long, ProcedureResult>> it = completed.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry<Long, ProcedureResult> entry = it.next(); + ProcedureResult result = entry.getValue(); + + // TODO: Select TTL based on Procedure type + if ((result.hasClientAckTime() && (now - result.getClientAckTime()) >= evictAckTtl) || + (now - result.getLastUpdate()) >= evictTtl) { + LOG.debug("Evict completed procedure " + entry.getKey()); + store.delete(entry.getKey()); + it.remove(); + } + } + } + + @Override + protected Procedure[] execute(final TEnvironment env) { + throw new UnsupportedOperationException(); + } + + @Override + protected void rollback(final TEnvironment env) { + throw new UnsupportedOperationException(); + } + + @Override + protected boolean abort(final TEnvironment env) { + throw new UnsupportedOperationException(); + } + + @Override + public void serializeStateData(final OutputStream stream) { + throw new UnsupportedOperationException(); + } + + @Override + public void deserializeStateData(final InputStream stream) { + throw new UnsupportedOperationException(); + } + } + + /** + * Map the the procId returned by submitProcedure(), the Root-ProcID, to the ProcedureResult. + * Once a Root-Procedure completes (success or failure), the result will be added to this map. + * The user of ProcedureExecutor should call getResult(procId) to get the result. + */ + private final ConcurrentHashMap<Long, ProcedureResult> completed = + new ConcurrentHashMap<Long, ProcedureResult>(); + + /** + * Map the the procId returned by submitProcedure(), the Root-ProcID, to the RootProcedureState. + * The RootProcedureState contains the execution stack of the Root-Procedure, + * It is added to the map by submitProcedure() and removed on procedure completion. + */ + private final ConcurrentHashMap<Long, RootProcedureState> rollbackStack = + new ConcurrentHashMap<Long, RootProcedureState>(); + + /** + * Helper map to lookup the live procedures by ID. + * This map contains every procedure. root-procedures and subprocedures. + */ + private final ConcurrentHashMap<Long, Procedure> procedures = + new ConcurrentHashMap<Long, Procedure>(); + + /** + * Timeout Queue that contains Procedures in a WAITING_TIMEOUT state + * or periodic procedures. + */ + private final TimeoutBlockingQueue<Procedure> waitingTimeout = + new TimeoutBlockingQueue<Procedure>(new ProcedureTimeoutRetriever()); + + /** + * Queue that contains runnable procedures. + */ + private final ProcedureRunnableSet runnables; + + private final CopyOnWriteArrayList<ProcedureExecutorListener> listeners = + new CopyOnWriteArrayList<ProcedureExecutorListener>(); + + private final AtomicInteger activeExecutorCount = new AtomicInteger(0); + private final AtomicBoolean running = new AtomicBoolean(false); + private final AtomicLong lastProcId = new AtomicLong(-1); + private final TEnvironment environment; + private final ProcedureStore store; + private final Configuration conf; + + private Thread[] threads; + + public ProcedureExecutor(final Configuration conf, final TEnvironment environment, + final ProcedureStore store) { + this(conf, environment, store, new ProcedureSimpleRunQueue()); + } + + public ProcedureExecutor(final Configuration conf, final TEnvironment environment, + final ProcedureStore store, final ProcedureRunnableSet runqueue) { + this.environment = environment; + this.runnables = runqueue; + this.store = store; + this.conf = conf; + } + + private List<Map.Entry<Long, RootProcedureState>> load() throws IOException { + Preconditions.checkArgument(completed.isEmpty()); + Preconditions.checkArgument(rollbackStack.isEmpty()); + Preconditions.checkArgument(procedures.isEmpty()); + Preconditions.checkArgument(waitingTimeout.isEmpty()); + Preconditions.checkArgument(runnables.size() == 0); + + // 1. Load the procedures + Iterator<Procedure> loader = store.load(); + if (loader == null) { + lastProcId.set(0); + return null; + } + + long logMaxProcId = 0; + int runnablesCount = 0; + while (loader.hasNext()) { + Procedure proc = loader.next(); + procedures.put(proc.getProcId(), proc); + logMaxProcId = Math.max(logMaxProcId, proc.getProcId()); + LOG.debug("Loading procedure " + proc + + " state=" + proc.getState() + " isFailed=" + proc.hasException()); + if (!proc.hasParent() && !proc.isFinished()) { + rollbackStack.put(proc.getProcId(), new RootProcedureState()); + } + if (proc.getState() == ProcedureState.RUNNABLE) { + runnablesCount++; + } + } + assert lastProcId.get() < 0; + lastProcId.set(logMaxProcId); + + // 2. Initialize the stacks + TreeSet<Procedure> runnableSet = null; + HashSet<Procedure> waitingSet = null; + for (final Procedure proc: procedures.values()) { + Long rootProcId = getRootProcedureId(proc); + if (rootProcId == null) { + // The 'proc' was ready to run but the root procedure was rolledback? + runnables.addBack(proc); + continue; + } + + if (!proc.hasParent() && proc.isFinished()) { + LOG.debug("The procedure is completed " + proc + + " state=" + proc.getState() + " isFailed=" + proc.hasException()); + assert !rollbackStack.containsKey(proc.getProcId()); + completed.put(proc.getProcId(), newResultFromProcedure(proc)); + continue; + } + + if (proc.hasParent() && !proc.isFinished()) { + Procedure parent = procedures.get(proc.getParentProcId()); + // corrupted procedures are handled later at step 3 + if (parent != null) { + parent.incChildrenLatch(); + } + } + + RootProcedureState procStack = rollbackStack.get(rootProcId); + procStack.loadStack(proc); + + switch (proc.getState()) { + case RUNNABLE: + if (runnableSet == null) { + runnableSet = new TreeSet<Procedure>(); + } + runnableSet.add(proc); + break; + case WAITING_TIMEOUT: + if (waitingSet == null) { + waitingSet = new HashSet<Procedure>(); + } + waitingSet.add(proc); + break; + case FINISHED: + if (proc.hasException()) { + // add the proc to the runnables to perform the rollback + runnables.addBack(proc); + break; + } + case ROLLEDBACK: + case INITIALIZING: + String msg = "Unexpected " + proc.getState() + " state for " + proc; + LOG.error(msg); + throw new UnsupportedOperationException(msg); + default: + break; + } + } + + // 3. Validate the stacks + List<Map.Entry<Long, RootProcedureState>> corrupted = null; + Iterator<Map.Entry<Long, RootProcedureState>> itStack = rollbackStack.entrySet().iterator(); + while (itStack.hasNext()) { + Map.Entry<Long, RootProcedureState> entry = itStack.next(); + RootProcedureState procStack = entry.getValue(); + if (procStack.isValid()) continue; + + for (Procedure proc: procStack.getSubprocedures()) { + procedures.remove(proc.getProcId()); + if (runnableSet != null) runnableSet.remove(proc); + if (waitingSet != null) waitingSet.remove(proc); + } + itStack.remove(); + if (corrupted == null) { + corrupted = new ArrayList<Map.Entry<Long, RootProcedureState>>(); + } + corrupted.add(entry); + } + + // 4. Push the runnables + if (runnableSet != null) { + // TODO: See ProcedureWALFormatReader.readInitEntry() some procedure + // may be started way before this stuff. + for (Procedure proc: runnableSet) { + if (!proc.hasParent()) { + sendProcedureLoadedNotification(proc.getProcId()); + } + runnables.addBack(proc); + } + } + return corrupted; + } + + public void start(int numThreads) throws IOException { + if (running.getAndSet(true)) { + LOG.warn("Already running"); + return; + } + + // We have numThreads executor + one timer thread used for timing out + // procedures and triggering periodic procedures. + threads = new Thread[numThreads + 1]; + LOG.info("Starting procedure executor threads=" + threads.length); + + // Initialize procedures executor + for (int i = 0; i < numThreads; ++i) { + threads[i] = new Thread() { + @Override + public void run() { + execLoop(); + } + }; + } + + // Initialize procedures timeout handler (this is the +1 thread) + threads[numThreads] = new Thread() { + @Override + public void run() { + timeoutLoop(); + } + }; + + // Acquire the store lease. + store.recoverLease(); + + // TODO: Split in two steps. + // TODO: Handle corrupted procedure returned (probably just a WARN) + // The first one will make sure that we have the latest id, + // so we can start the threads and accept new procedures. + // The second step will do the actual load of old procedures. + load(); + + // Start the executors. Here we must have the lastProcId set. + for (int i = 0; i < threads.length; ++i) { + threads[i].start(); + } + + // Add completed cleaner + waitingTimeout.add(new CompletedProcedureCleaner(conf, store, completed)); + } + + public void stop() { + if (!running.getAndSet(false)) { + return; + } + + LOG.info("Stopping the procedure executor"); + runnables.signalAll(); + waitingTimeout.signalAll(); + } + + public void join() { + boolean interrupted = false; + + for (int i = 0; i < threads.length; ++i) { + try { + threads[i].join(); + } catch (InterruptedException ex) { + interrupted = true; + } + } + + if (interrupted) { + Thread.currentThread().interrupt(); + } + + completed.clear(); + rollbackStack.clear(); + procedures.clear(); + waitingTimeout.clear(); + runnables.clear(); + lastProcId.set(-1); + } + + public boolean isRunning() { + return running.get(); + } + + /** + * @return the number of execution threads. + */ + public int getNumThreads() { + return threads == null ? 0 : (threads.length - 1); + } + + public int getActiveExecutorCount() { + return activeExecutorCount.get(); + } + + public TEnvironment getEnvironment() { + return this.environment; + } + + public ProcedureStore getStore() { + return this.store; + } + + public void registerListener(ProcedureExecutorListener listener) { + this.listeners.add(listener); + } + + public boolean unregisterListener(ProcedureExecutorListener listener) { + return this.listeners.remove(listener); + } + + /** + * Add a new root-procedure to the executor. + * @param proc the new procedure to execute. + * @return the procedure id, that can be used to monitor the operation + */ + public long submitProcedure(final Procedure proc) { + Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING); + Preconditions.checkArgument(isRunning()); + Preconditions.checkArgument(lastProcId.get() >= 0); + Preconditions.checkArgument(!proc.hasParent()); + + // Initialize the Procedure ID + proc.setProcId(nextProcId()); + + // Commit the transaction + store.insert(proc, null); + LOG.debug("procedure " + proc + " added to the store"); + + // Create the rollback stack for the procedure + RootProcedureState stack = new RootProcedureState(); + rollbackStack.put(proc.getProcId(), stack); + + // Submit the new subprocedures + assert !procedures.containsKey(proc.getProcId()); + procedures.put(proc.getProcId(), proc); + sendProcedureAddedNotification(proc.getProcId()); + runnables.addBack(proc); + return proc.getProcId(); + } + + public ProcedureResult getResult(final long procId) { + return completed.get(procId); + } + + /** + * Return true if the procedure is finished. + * The state may be "completed successfully" or "failed and rolledback". + * Use getResult() to check the state or get the result data. + * @param procId the ID of the procedure to check + * @return true if the procedure execution is finished, otherwise false. + */ + public boolean isFinished(final long procId) { + return completed.containsKey(procId); + } + + /** + * Mark the specified completed procedure, as ready to remove. + * @param procId the ID of the procedure to remove + */ + public void removeResult(final long procId) { + ProcedureResult result = completed.get(procId); + if (result == null) { + assert !procedures.containsKey(procId) : "procId=" + procId + " is still running"; + LOG.debug("Procedure procId=" + procId + " already removed by the cleaner"); + return; + } + + // The CompletedProcedureCleaner will take care of deletion, once the TTL is expired. + result.setClientAckTime(EnvironmentEdgeManager.currentTime()); + } + + /** + * Send an abort notification the specified procedure. + * Depending on the procedure implementation the abort can be considered or ignored. + * @param procId the procedure to abort + * @return true if the procedure exist and has received the abort, otherwise false. + */ + public boolean abort(final long procId) { + Procedure proc = procedures.get(procId); + if (proc != null) { + return proc.abort(getEnvironment()); + } + return false; + } + + public Map<Long, ProcedureResult> getResults() { + return Collections.unmodifiableMap(completed); + } + + public Procedure getProcedure(final long procId) { + return procedures.get(procId); + } + + /** + * Execution loop (N threads) + * while the executor is in a running state, + * fetch a procedure from the runnables queue and start the execution. + */ + private void execLoop() { + while (running.get()) { + Long procId = runnables.poll(); + Procedure proc = procId != null ? procedures.get(procId) : null; + if (proc == null) continue; + + try { + activeExecutorCount.incrementAndGet(); + execLoop(proc); + } finally { + activeExecutorCount.decrementAndGet(); + } + } + } + + private void execLoop(Procedure proc) { + LOG.debug("starting the execution of " + proc); + + Long rootProcId = getRootProcedureId(proc); + if (rootProcId == null) { + // The 'proc' was ready to run but the root procedure was rolledback + executeRollback(proc); + return; + } + + RootProcedureState procStack = rollbackStack.get(rootProcId); + if (procStack == null) return; + + do { + // Try to acquire the execution + if (!procStack.acquire(proc)) { + if (procStack.setRollback()) { + // we have the 'rollback-lock' we can start rollingback + executeRollback(rootProcId, procStack); + } else { + // if we can't rollback means that some child is still running. + // the rollback will be executed after all the children are done. + // If the procedure was never executed, remove and mark it as rolledback. + if (!proc.wasExecuted()) { + executeRollback(proc); + } + } + break; + } + + // Execute the procedure + assert proc.getState() == ProcedureState.RUNNABLE; + if (proc.acquireLock(getEnvironment())) { + execProcedure(procStack, proc); + proc.releaseLock(getEnvironment()); + } else { + runnables.yield(proc); + } + procStack.release(proc); + + // allows to kill the executor before something is stored to the wal. + // useful to test the procedure recovery. + if (testing != null && !running.get()) { + break; + } + + if (proc.getProcId() == rootProcId && proc.isSuccess()) { + // Finalize the procedure state + LOG.info("Procedure completed in " + + StringUtils.humanTimeDiff(proc.elapsedTime()) + ": " + proc); + procedureFinished(proc); + break; + } + } while (procStack.isFailed()); + } + + private void timeoutLoop() { + while (running.get()) { + Procedure proc = waitingTimeout.poll(); + if (proc == null) continue; + + if (proc.getTimeRemaining() > 100) { + // got an early wake, maybe a stop? + // re-enqueue the task in case was not a stop or just a signal + waitingTimeout.add(proc); + continue; + } + + // ---------------------------------------------------------------------------- + // TODO-MAYBE: Should we provide a notification to the store with the + // full set of procedures pending and completed to write a compacted + // version of the log (in case is a log)? + // In theory no, procedures are have a short life, so at some point the store + // will have the tracker saying everything is in the last log. + // ---------------------------------------------------------------------------- + + // The CompletedProcedureCleaner is a special case, and it acts as a chore. + // instead of bringing the Chore class in, we reuse this timeout thread for + // this special case. + if (proc instanceof CompletedProcedureCleaner) { + try { + ((CompletedProcedureCleaner)proc).periodicExecute(getEnvironment()); + } catch (Throwable e) { + LOG.error("ignoring CompletedProcedureCleaner exception: " + e.getMessage(), e); + } + proc.setStartTime(EnvironmentEdgeManager.currentTime()); + waitingTimeout.add(proc); + continue; + } + + // The procedure received an "abort-timeout", call abort() and + // add the procedure back in the queue for rollback. + if (proc.setTimeoutFailure()) { + long rootProcId = Procedure.getRootProcedureId(procedures, proc); + RootProcedureState procStack = rollbackStack.get(rootProcId); + procStack.abort(); + store.update(proc); + runnables.addFront(proc); + continue; + } + } + } + + /** + * Execute the rollback of the full procedure stack. + * Once the procedure is rolledback, the root-procedure will be visible as + * finished to user, and the result will be the fatal exception. + */ + private boolean executeRollback(final long rootProcId, final RootProcedureState procStack) { + Procedure rootProc = procedures.get(rootProcId); + RemoteProcedureException exception = rootProc.getException(); + if (exception == null) { + exception = procStack.getException(); + rootProc.setFailure(exception); + store.update(rootProc); + } + + List<Procedure> subprocStack = procStack.getSubprocedures(); + assert subprocStack != null : "called rollback with no steps executed rootProc=" + rootProc; + + int stackTail = subprocStack.size(); + boolean reuseLock = false; + while (stackTail --> 0) { + final Procedure proc = subprocStack.remove(stackTail); + + if (!reuseLock && !proc.acquireLock(getEnvironment())) { + // can't take a lock on the procedure, add the root-proc back on the + // queue waiting for the lock availability + runnables.yield(rootProc); + return false; + } + + executeRollback(proc); + boolean abortRollback = !running.get(); + + // If the next procedure is the same to this one + // (e.g. StateMachineProcedure reuse the same instance) + // we can avoid to lock/unlock each step + reuseLock = stackTail > 0 && (subprocStack.get(stackTail - 1) == proc) && !abortRollback; + if (!reuseLock) { + proc.releaseLock(getEnvironment()); + } + + // allows to kill the executor before something is stored to the wal. + // useful to test the procedure recovery. + if (abortRollback) { + return false; + } + } + + // Finalize the procedure state + LOG.info("Rolledback procedure " + rootProc + + " exec-time=" + StringUtils.humanTimeDiff(rootProc.elapsedTime()) + + " exception=" + exception.getMessage()); + procedureFinished(rootProc); + return true; + } + + /** + * Execute the rollback of the procedure step. + * It updates the store with the new state (stack index) + * or will remove completly the procedure in case it is a child. + */ + private void executeRollback(final Procedure proc) { + try { + proc.doRollback(getEnvironment()); + } catch (Throwable e) { + // Catch NullPointerExceptions or similar errors... + LOG.fatal("CODE-BUG: uncatched runtime exception for procedure: " + proc, e); + } + + // allows to kill the executor before something is stored to the wal. + // useful to test the procedure recovery. + if (testing != null && testing.shouldKillBeforeStoreUpdate()) { + stop(); + return; + } + + if (proc.removeStackIndex()) { + proc.setState(ProcedureState.ROLLEDBACK); + if (proc.hasParent()) { + store.delete(proc.getProcId()); + procedures.remove(proc.getProcId()); + } else { + store.update(proc); + } + } else { + store.update(proc); + } + } + + /** + * Executes the specified procedure + * - calls the doExecute() of the procedure + * - if the procedure execution didn't fail (e.g. invalid user input) + * - ...and returned subprocedures + * - the subprocedures are initialized. + * - the subprocedures are added to the store + * - the subprocedures are added to the runnable queue + * - the procedure is now in a WAITING state, waiting for the subprocedures to complete + * - ...if there are no subprocedure + * - the procedure completed successfully + * - if there is a parent (WAITING) + * - the parent state will be set to RUNNABLE + * - in case of failure + * - the store is updated with the new state + * - the executor (caller of this method) will start the rollback of the procedure + */ + private void execProcedure(final RootProcedureState procStack, final Procedure procedure) { + Preconditions.checkArgument(procedure.getState() == ProcedureState.RUNNABLE); + + // Execute the procedure + boolean reExecute = false; + Procedure[] subprocs = null; + do { + reExecute = false; + try { + subprocs = procedure.doExecute(getEnvironment()); + if (subprocs != null && subprocs.length == 0) { + subprocs = null; + } + } catch (ProcedureYieldException e) { + if (LOG.isTraceEnabled()) { + LOG.trace("yield procedure: " + procedure); + } + runnables.yield(procedure); + return; + } catch (Throwable e) { + // Catch NullPointerExceptions or similar errors... + String msg = "CODE-BUG: uncatched runtime exception for procedure: " + procedure; + LOG.error(msg, e); + procedure.setFailure(new RemoteProcedureException(msg, e)); + } + + if (!procedure.isFailed()) { + if (subprocs != null) { + if (subprocs.length == 1 && subprocs[0] == procedure) { + // quick-shortcut for a state machine like procedure + subprocs = null; + reExecute = true; + } else { + // yield the current procedure, and make the subprocedure runnable + for (int i = 0; i < subprocs.length; ++i) { + Procedure subproc = subprocs[i]; + if (subproc == null) { + String msg = "subproc[" + i + "] is null, aborting the procedure"; + procedure.setFailure(new RemoteProcedureException(msg, + new IllegalArgumentException(msg))); + subprocs = null; + break; + } + + assert subproc.getState() == ProcedureState.INITIALIZING; + subproc.setParentProcId(procedure.getProcId()); + subproc.setProcId(nextProcId()); + } + + if (!procedure.isFailed()) { + procedure.setChildrenLatch(subprocs.length); + switch (procedure.getState()) { + case RUNNABLE: + procedure.setState(ProcedureState.WAITING); + break; + case WAITING_TIMEOUT: + waitingTimeout.add(procedure); + break; + default: + break; + } + } + } + } else if (procedure.getState() == ProcedureState.WAITING_TIMEOUT) { + waitingTimeout.add(procedure); + } else { + // No subtask, so we are done + procedure.setState(ProcedureState.FINISHED); + } + } + + // Add the procedure to the stack + procStack.addRollbackStep(procedure); + + // allows to kill the executor before something is stored to the wal. + // useful to test the procedure recovery. + if (testing != null && testing.shouldKillBeforeStoreUpdate()) { + stop(); + return; + } + + // Commit the transaction + if (subprocs != null && !procedure.isFailed()) { + if (LOG.isTraceEnabled()) { + LOG.trace("store add " + procedure + " children " + Arrays.toString(subprocs)); + } + store.insert(procedure, subprocs); + } else { + if (LOG.isTraceEnabled()) { + LOG.trace("store update " + procedure); + } + store.update(procedure); + } + + assert (reExecute && subprocs == null) || !reExecute; + } while (reExecute); + + // Submit the new subprocedures + if (subprocs != null && !procedure.isFailed()) { + for (int i = 0; i < subprocs.length; ++i) { + Procedure subproc = subprocs[i]; + assert !procedures.containsKey(subproc.getProcId()); + procedures.put(subproc.getProcId(), subproc); + runnables.addFront(subproc); + } + } + + if (procedure.isFinished() && procedure.hasParent()) { + Procedure parent = procedures.get(procedure.getParentProcId()); + if (parent == null) { + assert procStack.isRollingback(); + return; + } + + // If this procedure is the last child awake the parent procedure + if (LOG.isTraceEnabled()) { + LOG.trace(parent + " child is done: " + procedure); + } + if (parent.childrenCountDown() && parent.getState() == ProcedureState.WAITING) { + parent.setState(ProcedureState.RUNNABLE); + store.update(parent); + runnables.addFront(parent); + if (LOG.isTraceEnabled()) { + LOG.trace(parent + " all the children finished their work, resume."); + } + return; + } + } + } + + private void sendProcedureLoadedNotification(final long procId) { + if (!this.listeners.isEmpty()) { + for (ProcedureExecutorListener listener: this.listeners) { + try { + listener.procedureLoaded(procId); + } catch (Throwable e) { + LOG.error("the listener " + listener + " had an error: " + e.getMessage(), e); + } + } + } + } + + private void sendProcedureAddedNotification(final long procId) { + if (!this.listeners.isEmpty()) { + for (ProcedureExecutorListener listener: this.listeners) { + try { + listener.procedureAdded(procId); + } catch (Throwable e) { + LOG.error("the listener " + listener + " had an error: " + e.getMessage(), e); + } + } + } + } + + private void sendProcedureFinishedNotification(final long procId) { + if (!this.listeners.isEmpty()) { + for (ProcedureExecutorListener listener: this.listeners) { + try { + listener.procedureFinished(procId); + } catch (Throwable e) { + LOG.error("the listener " + listener + " had an error: " + e.getMessage(), e); + } + } + } + } + + private long nextProcId() { + long procId = lastProcId.incrementAndGet(); + if (procId < 0) { + while (!lastProcId.compareAndSet(procId, 0)) { + procId = lastProcId.get(); + if (procId >= 0) + break; + } + while (procedures.containsKey(procId)) { + procId = lastProcId.incrementAndGet(); + } + } + return procId; + } + + private Long getRootProcedureId(Procedure proc) { + return Procedure.getRootProcedureId(procedures, proc); + } + + private void procedureFinished(final Procedure proc) { + proc.completionCleanup(getEnvironment()); + completed.put(proc.getProcId(), newResultFromProcedure(proc)); + rollbackStack.remove(proc.getProcId()); + sendProcedureFinishedNotification(proc.getProcId()); + } + + private static ProcedureResult newResultFromProcedure(final Procedure proc) { + if (proc.isFailed()) { + return new ProcedureResult(proc.getStartTime(), proc.getLastUpdate(), proc.getException()); + } + return new ProcedureResult(proc.getStartTime(), proc.getLastUpdate(), proc.getResult()); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java new file mode 100644 index 0000000..6a6eda2 --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureFairRunQueues.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import java.util.Map; + +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.ConcurrentSkipListMap; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * This class is a container of queues that allows to select a queue + * in a round robin fashion, considering priority of the queue. + * + * the quantum is just how many poll() will return the same object. + * e.g. if quantum is 1 and you have A and B as object you'll get: A B A B + * e.g. if quantum is 2 and you have A and B as object you'll get: A A B B A A B B + * then the object priority is just a priority * quantum + * + * Example: + * - three queues (A, B, C) with priorities (1, 1, 2) + * - The first poll() will return A + * - The second poll() will return B + * - The third and forth poll() will return C + * - and so on again and again. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ProcedureFairRunQueues<TKey, TQueue extends ProcedureFairRunQueues.FairObject> { + private ConcurrentSkipListMap<TKey, TQueue> objMap = + new ConcurrentSkipListMap<TKey, TQueue>(); + + private final ReentrantLock lock = new ReentrantLock(); + private final Condition waitCond = lock.newCondition(); + private final int quantum; + + private Map.Entry<TKey, TQueue> current; + private int currentQuantum = 0; + + public interface FairObject { + boolean isAvailable(); + int getPriority(); + } + + /** + * @param quantum how many poll() will return the same object. + */ + public ProcedureFairRunQueues(final int quantum) { + this.quantum = quantum; + } + + public TQueue get(final TKey key) { + return objMap.get(key); + } + + public TQueue add(final TKey key, final TQueue queue) { + TQueue oldq = objMap.putIfAbsent(key, queue); + return oldq != null ? oldq : queue; + } + + public TQueue remove(final TKey key) { + return objMap.remove(key); + } + + public void clear() { + lock.lock(); + try { + current = null; + objMap.clear(); + } finally { + lock.unlock(); + } + } + + public TQueue poll() { + lock.lock(); + try { + TQueue queue; + if (currentQuantum == 0) { + if (nextObject() == null) { + // nothing here + return null; + } + + queue = current.getValue(); + currentQuantum = calculateQuantum(queue) - 1; + } else { + currentQuantum--; + queue = current.getValue(); + } + + if (!queue.isAvailable()) { + Map.Entry<TKey, TQueue> last = current; + // Try the next one + do { + if (nextObject() == null) + return null; + } while (current.getValue() != last.getValue() && !current.getValue().isAvailable()); + + queue = current.getValue(); + currentQuantum = calculateQuantum(queue) - 1; + } + + return queue; + } finally { + lock.unlock(); + } + } + + private Map.Entry<TKey, TQueue> nextObject() { + Map.Entry<TKey, TQueue> next = null; + + // If we have already a key, try the next one + if (current != null) { + next = objMap.higherEntry(current.getKey()); + } + + // if there is no higher key, go back to the first + current = (next != null) ? next : objMap.firstEntry(); + return current; + } + + private int calculateQuantum(final TQueue fairObject) { + // TODO + return Math.max(1, fairObject.getPriority() * quantum); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/d71f54f8/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java new file mode 100644 index 0000000..0aebd5a --- /dev/null +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.procedure2; + +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Once a Procedure completes the ProcedureExecutor takes all the useful + * information of the procedure (e.g. exception/result) and creates a ProcedureResult. + * The user of the Procedure framework will get the procedure result with + * procedureExecutor.getResult(procId) + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class ProcedureResult { + private final RemoteProcedureException exception; + private final long lastUpdate; + private final long startTime; + private final byte[] result; + + private long clientAckTime = -1; + + public ProcedureResult(final long startTime, final long lastUpdate, + final RemoteProcedureException exception) { + this.lastUpdate = lastUpdate; + this.startTime = startTime; + this.exception = exception; + this.result = null; + } + + public ProcedureResult(final long startTime, final long lastUpdate, final byte[] result) { + this.lastUpdate = lastUpdate; + this.startTime = startTime; + this.exception = null; + this.result = result; + } + + public boolean isFailed() { + return exception != null; + } + + public RemoteProcedureException getException() { + return exception; + } + + public boolean hasResultData() { + return result != null; + } + + public byte[] getResult() { + return result; + } + + public long getStartTime() { + return startTime; + } + + public long getLastUpdate() { + return lastUpdate; + } + + public long executionTime() { + return lastUpdate - startTime; + } + + public boolean hasClientAckTime() { + return clientAckTime > 0; + } + + public long getClientAckTime() { + return clientAckTime; + } + + @InterfaceAudience.Private + protected void setClientAckTime(final long timestamp) { + this.clientAckTime = timestamp; + } +} \ No newline at end of file