HBASE-13415 Procedure V2 - Use nonces for double submits from client (Stephen Yuan Jiang)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0c900fe7 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0c900fe7 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0c900fe7 Branch: refs/heads/branch-1.1 Commit: 0c900fe7eebf5d5253c4f2bf69f04127dcf0c80a Parents: bab0f0f Author: Stephen Yuan Jiang <syuanjiang...@gmail.com> Authored: Tue Jan 5 23:55:16 2016 -0800 Committer: Stephen Yuan Jiang <syuanjiang...@gmail.com> Committed: Tue Jan 5 23:55:16 2016 -0800 ---------------------------------------------------------------------- .../apache/hadoop/hbase/client/HBaseAdmin.java | 30 +- .../hadoop/hbase/protobuf/RequestConverter.java | 69 +- .../org/apache/hadoop/hbase/util/NonceKey.java | 65 + .../hadoop/hbase/procedure2/Procedure.java | 29 +- .../hbase/procedure2/ProcedureExecutor.java | 92 +- .../hbase/procedure2/ProcedureResult.java | 34 +- .../procedure2/ProcedureTestingUtility.java | 9 +- .../hbase/procedure2/TestProcedureRecovery.java | 39 +- .../hbase/protobuf/generated/MasterProtos.java | 2091 ++++++++++++++++-- .../protobuf/generated/ProcedureProtos.java | 257 ++- hbase-protocol/src/main/protobuf/Master.proto | 18 + .../src/main/protobuf/Procedure.proto | 4 + .../org/apache/hadoop/hbase/master/HMaster.java | 114 +- .../hadoop/hbase/master/MasterRpcServices.java | 51 +- .../hadoop/hbase/master/MasterServices.java | 70 +- .../hbase/regionserver/ServerNonceManager.java | 33 +- .../security/access/AccessControlLists.java | 4 +- .../visibility/VisibilityController.java | 2 +- .../hbase/client/TestHBaseAdminNoCluster.java | 5 + .../master/TestAssignmentManagerOnCluster.java | 2 +- .../hadoop/hbase/master/TestCatalogJanitor.java | 62 +- .../hadoop/hbase/master/TestProcedureConf.java | 7 +- .../MasterProcedureTestingUtility.java | 9 + .../procedure/TestAddColumnFamilyProcedure.java | 94 +- .../procedure/TestCreateTableProcedure.java | 46 +- .../TestDeleteColumnFamilyProcedure.java | 102 +- .../procedure/TestDeleteTableProcedure.java | 48 +- .../procedure/TestDisableTableProcedure.java | 44 +- .../procedure/TestEnableTableProcedure.java | 47 +- .../TestModifyColumnFamilyProcedure.java | 45 +- .../procedure/TestModifyTableProcedure.java | 27 +- .../procedure/TestTruncateTableProcedure.java | 15 +- 32 files changed, 3049 insertions(+), 515 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/0c900fe7/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 5a0def3..38a980b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -206,6 +206,8 @@ public class HBaseAdmin implements Admin { private RpcRetryingCallerFactory rpcCallerFactory; + private NonceGenerator ng; + /** * Constructor. * See {@link #HBaseAdmin(Connection connection)} @@ -261,6 +263,8 @@ public class HBaseAdmin implements Admin { "hbase.client.sync.wait.timeout.msec", 10 * 60000); // 10min this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf); + + this.ng = this.connection.getNonceGenerator(); } @Override @@ -637,7 +641,8 @@ public class HBaseAdmin implements Admin { new MasterCallable<CreateTableResponse>(getConnection()) { @Override public CreateTableResponse call(int callTimeout) throws ServiceException { - CreateTableRequest request = RequestConverter.buildCreateTableRequest(desc, splitKeys); + CreateTableRequest request = RequestConverter.buildCreateTableRequest( + desc, splitKeys, ng.getNonceGroup(), ng.newNonce()); return master.createTable(null, request); } }); @@ -807,7 +812,8 @@ public class HBaseAdmin implements Admin { new MasterCallable<DeleteTableResponse>(getConnection()) { @Override public DeleteTableResponse call(int callTimeout) throws ServiceException { - DeleteTableRequest req = RequestConverter.buildDeleteTableRequest(tableName); + DeleteTableRequest req = + RequestConverter.buildDeleteTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()); return master.deleteTable(null,req); } }); @@ -919,7 +925,7 @@ public class HBaseAdmin implements Admin { @Override public Void call(int callTimeout) throws ServiceException { TruncateTableRequest req = RequestConverter.buildTruncateTableRequest( - tableName, preserveSplits); + tableName, preserveSplits, ng.getNonceGroup(), ng.newNonce()); master.truncateTable(null, req); return null; } @@ -1055,7 +1061,8 @@ public class HBaseAdmin implements Admin { @Override public EnableTableResponse call(int callTimeout) throws ServiceException { LOG.info("Started enable of " + tableName); - EnableTableRequest req = RequestConverter.buildEnableTableRequest(tableName); + EnableTableRequest req = + RequestConverter.buildEnableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()); return master.enableTable(null,req); } }); @@ -1242,7 +1249,8 @@ public class HBaseAdmin implements Admin { @Override public DisableTableResponse call(int callTimeout) throws ServiceException { LOG.info("Started disable of " + tableName); - DisableTableRequest req = RequestConverter.buildDisableTableRequest(tableName); + DisableTableRequest req = + RequestConverter.buildDisableTableRequest(tableName, ng.getNonceGroup(), ng.newNonce()); return master.disableTable(null, req); } }); @@ -1521,7 +1529,8 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - AddColumnRequest req = RequestConverter.buildAddColumnRequest(tableName, column); + AddColumnRequest req = RequestConverter.buildAddColumnRequest( + tableName, column, ng.getNonceGroup(), ng.newNonce()); master.addColumn(null,req); return null; } @@ -1568,7 +1577,8 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - DeleteColumnRequest req = RequestConverter.buildDeleteColumnRequest(tableName, columnName); + DeleteColumnRequest req = RequestConverter.buildDeleteColumnRequest( + tableName, columnName, ng.getNonceGroup(), ng.newNonce()); master.deleteColumn(null,req); return null; } @@ -1617,7 +1627,8 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - ModifyColumnRequest req = RequestConverter.buildModifyColumnRequest(tableName, descriptor); + ModifyColumnRequest req = RequestConverter.buildModifyColumnRequest( + tableName, descriptor, ng.getNonceGroup(), ng.newNonce()); master.modifyColumn(null,req); return null; } @@ -2452,7 +2463,8 @@ public class HBaseAdmin implements Admin { executeCallable(new MasterCallable<Void>(getConnection()) { @Override public Void call(int callTimeout) throws ServiceException { - ModifyTableRequest request = RequestConverter.buildModifyTableRequest(tableName, htd); + ModifyTableRequest request = RequestConverter.buildModifyTableRequest( + tableName, htd, ng.getNonceGroup(), ng.newNonce()); master.modifyTable(null, request); return null; } http://git-wip-us.apache.org/repos/asf/hbase/blob/0c900fe7/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java index 94fb892..f2fc545 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java @@ -1060,10 +1060,15 @@ public final class RequestConverter { * @return an AddColumnRequest */ public static AddColumnRequest buildAddColumnRequest( - final TableName tableName, final HColumnDescriptor column) { + final TableName tableName, + final HColumnDescriptor column, + final long nonceGroup, + final long nonce) { AddColumnRequest.Builder builder = AddColumnRequest.newBuilder(); builder.setTableName(ProtobufUtil.toProtoTableName(tableName)); builder.setColumnFamilies(column.convert()); + builder.setNonceGroup(nonceGroup); + builder.setNonce(nonce); return builder.build(); } @@ -1075,10 +1080,15 @@ public final class RequestConverter { * @return a DeleteColumnRequest */ public static DeleteColumnRequest buildDeleteColumnRequest( - final TableName tableName, final byte [] columnName) { + final TableName tableName, + final byte [] columnName, + final long nonceGroup, + final long nonce) { DeleteColumnRequest.Builder builder = DeleteColumnRequest.newBuilder(); builder.setTableName(ProtobufUtil.toProtoTableName((tableName))); builder.setColumnName(ByteStringer.wrap(columnName)); + builder.setNonceGroup(nonceGroup); + builder.setNonce(nonce); return builder.build(); } @@ -1090,10 +1100,15 @@ public final class RequestConverter { * @return an ModifyColumnRequest */ public static ModifyColumnRequest buildModifyColumnRequest( - final TableName tableName, final HColumnDescriptor column) { + final TableName tableName, + final HColumnDescriptor column, + final long nonceGroup, + final long nonce) { ModifyColumnRequest.Builder builder = ModifyColumnRequest.newBuilder(); builder.setTableName(ProtobufUtil.toProtoTableName((tableName))); builder.setColumnFamilies(column.convert()); + builder.setNonceGroup(nonceGroup); + builder.setNonce(nonce); return builder.build(); } @@ -1175,9 +1190,14 @@ public final class RequestConverter { * @param tableName * @return a DeleteTableRequest */ - public static DeleteTableRequest buildDeleteTableRequest(final TableName tableName) { + public static DeleteTableRequest buildDeleteTableRequest( + final TableName tableName, + final long nonceGroup, + final long nonce) { DeleteTableRequest.Builder builder = DeleteTableRequest.newBuilder(); builder.setTableName(ProtobufUtil.toProtoTableName(tableName)); + builder.setNonceGroup(nonceGroup); + builder.setNonce(nonce); return builder.build(); } @@ -1188,11 +1208,16 @@ public final class RequestConverter { * @param preserveSplits True if the splits should be preserved * @return a TruncateTableRequest */ - public static TruncateTableRequest buildTruncateTableRequest(final TableName tableName, - boolean preserveSplits) { + public static TruncateTableRequest buildTruncateTableRequest( + final TableName tableName, + final boolean preserveSplits, + final long nonceGroup, + final long nonce) { TruncateTableRequest.Builder builder = TruncateTableRequest.newBuilder(); builder.setTableName(ProtobufUtil.toProtoTableName(tableName)); builder.setPreserveSplits(preserveSplits); + builder.setNonceGroup(nonceGroup); + builder.setNonce(nonce); return builder.build(); } @@ -1202,9 +1227,14 @@ public final class RequestConverter { * @param tableName * @return an EnableTableRequest */ - public static EnableTableRequest buildEnableTableRequest(final TableName tableName) { + public static EnableTableRequest buildEnableTableRequest( + final TableName tableName, + final long nonceGroup, + final long nonce) { EnableTableRequest.Builder builder = EnableTableRequest.newBuilder(); builder.setTableName(ProtobufUtil.toProtoTableName(tableName)); + builder.setNonceGroup(nonceGroup); + builder.setNonce(nonce); return builder.build(); } @@ -1214,9 +1244,14 @@ public final class RequestConverter { * @param tableName * @return a DisableTableRequest */ - public static DisableTableRequest buildDisableTableRequest(final TableName tableName) { + public static DisableTableRequest buildDisableTableRequest( + final TableName tableName, + final long nonceGroup, + final long nonce) { DisableTableRequest.Builder builder = DisableTableRequest.newBuilder(); builder.setTableName(ProtobufUtil.toProtoTableName((tableName))); + builder.setNonceGroup(nonceGroup); + builder.setNonce(nonce); return builder.build(); } @@ -1228,7 +1263,10 @@ public final class RequestConverter { * @return a CreateTableRequest */ public static CreateTableRequest buildCreateTableRequest( - final HTableDescriptor hTableDesc, final byte [][] splitKeys) { + final HTableDescriptor hTableDesc, + final byte [][] splitKeys, + final long nonceGroup, + final long nonce) { CreateTableRequest.Builder builder = CreateTableRequest.newBuilder(); builder.setTableSchema(hTableDesc.convert()); if (splitKeys != null) { @@ -1236,6 +1274,8 @@ public final class RequestConverter { builder.addSplitKeys(ByteStringer.wrap(splitKey)); } } + builder.setNonceGroup(nonceGroup); + builder.setNonce(nonce); return builder.build(); } @@ -1248,10 +1288,15 @@ public final class RequestConverter { * @return a ModifyTableRequest */ public static ModifyTableRequest buildModifyTableRequest( - final TableName tableName, final HTableDescriptor hTableDesc) { + final TableName tableName, + final HTableDescriptor hTableDesc, + final long nonceGroup, + final long nonce) { ModifyTableRequest.Builder builder = ModifyTableRequest.newBuilder(); builder.setTableName(ProtobufUtil.toProtoTableName((tableName))); builder.setTableSchema(hTableDesc.convert()); + builder.setNonceGroup(nonceGroup); + builder.setNonce(nonce); return builder.build(); } @@ -1353,7 +1398,9 @@ public final class RequestConverter { * @param synchronous * @return a SetBalancerRunningRequest */ - public static SetBalancerRunningRequest buildSetBalancerRunningRequest(boolean on, boolean synchronous) { + public static SetBalancerRunningRequest buildSetBalancerRunningRequest( + boolean on, + boolean synchronous) { return SetBalancerRunningRequest.newBuilder().setOn(on).setSynchronous(synchronous).build(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/0c900fe7/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java new file mode 100644 index 0000000..9c7c72a --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/NonceKey.java @@ -0,0 +1,65 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + + /** + * This implementation is not smart and just treats nonce group and nonce as random bits. + */ + // TODO: we could use pure byte arrays, but then we wouldn't be able to use hash map. +@InterfaceAudience.Private +public class NonceKey { + private long group; + private long nonce; + + public NonceKey(long group, long nonce) { + assert nonce != HConstants.NO_NONCE; + this.group = group; + this.nonce = nonce; + } + + @Override + public boolean equals(Object obj) { + if (obj == null || !(obj instanceof NonceKey)) { + return false; + } + NonceKey nk = ((NonceKey)obj); + return this.nonce == nk.nonce && this.group == nk.group; + } + + @Override + public int hashCode() { + return (int)((group >> 32) ^ group ^ (nonce >> 32) ^ nonce); + } + + @Override + public String toString() { + return "[" + group + ":" + nonce + "]"; + } + + public long getNonceGroup() { + return group; + } + + public long getNonce() { + return nonce; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0c900fe7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java index 338fcad..e5da8b4 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/Procedure.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeoutException; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.procedure2.util.StringUtils; @@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos; import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState; import org.apache.hadoop.hbase.util.ByteStringer; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.NonceKey; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; @@ -78,6 +80,8 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { private RemoteProcedureException exception = null; private byte[] result = null; + private NonceKey nonceKey = null; + /** * The main code of the procedure. It must be idempotent since execute() * may be called multiple time in case of machine failure in the middle @@ -237,6 +241,10 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { return parentProcId; } + public NonceKey getNonceKey() { + return nonceKey; + } + /** * @return true if the procedure has failed. * true may mean failed but not yet rolledback or failed and rolledback. @@ -389,6 +397,15 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { } /** + * Called by the ProcedureExecutor to set the value to the newly created procedure. + */ + @VisibleForTesting + @InterfaceAudience.Private + protected void setNonceKey(final NonceKey nonceKey) { + this.nonceKey = nonceKey; + } + + /** * Internal method called by the ProcedureExecutor that starts the * user-level code execute(). */ @@ -621,6 +638,11 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { builder.setStateData(stateStream.toByteString()); } + if (proc.getNonceKey() != null) { + builder.setNonceGroup(proc.getNonceKey().getNonceGroup()); + builder.setNonce(proc.getNonceKey().getNonce()); + } + return builder.build(); } @@ -672,9 +694,14 @@ public abstract class Procedure<TEnvironment> implements Comparable<Procedure> { proc.setResult(proto.getResult().toByteArray()); } + if (proto.getNonce() != HConstants.NO_NONCE) { + NonceKey nonceKey = new NonceKey(proto.getNonceGroup(), proto.getNonce()); + proc.setNonceKey(nonceKey); + } + // we want to call deserialize even when the stream is empty, mainly for testing. proc.deserializeStateData(proto.getStateData().newInput()); return proc; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0c900fe7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java index d615429..13f6b1a 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureExecutor.java @@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; @@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue; import org.apache.hadoop.hbase.procedure2.util.TimeoutBlockingQueue.TimeoutRetriever; import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos.ProcedureState; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.NonceKey; import org.apache.hadoop.hbase.util.Pair; import com.google.common.base.Preconditions; @@ -134,14 +136,17 @@ public class ProcedureExecutor<TEnvironment> { private static final int DEFAULT_ACKED_EVICT_TTL = 5 * 60000; // 5min private final Map<Long, ProcedureResult> completed; + private final Map<NonceKey, Long> nonceKeysToProcIdsMap; private final ProcedureStore store; private final Configuration conf; public CompletedProcedureCleaner(final Configuration conf, final ProcedureStore store, - final Map<Long, ProcedureResult> completedMap) { + final Map<Long, ProcedureResult> completedMap, + final Map<NonceKey, Long> nonceKeysToProcIdsMap) { // set the timeout interval that triggers the periodic-procedure setTimeout(conf.getInt(CLEANER_INTERVAL_CONF_KEY, DEFAULT_CLEANER_INTERVAL)); this.completed = completedMap; + this.nonceKeysToProcIdsMap = nonceKeysToProcIdsMap; this.store = store; this.conf = conf; } @@ -171,6 +176,11 @@ public class ProcedureExecutor<TEnvironment> { } store.delete(entry.getKey()); it.remove(); + + NonceKey nonceKey = result.getNonceKey(); + if (nonceKey != null) { + nonceKeysToProcIdsMap.remove(nonceKey); + } } } } @@ -225,6 +235,13 @@ public class ProcedureExecutor<TEnvironment> { new ConcurrentHashMap<Long, Procedure>(); /** + * Helper map to lookup whether the procedure already issued from the same client. + * This map contains every root procedure. + */ + private ConcurrentHashMap<NonceKey, Long> nonceKeysToProcIdsMap = + new ConcurrentHashMap<NonceKey, Long>(); + + /** * Timeout Queue that contains Procedures in a WAITING_TIMEOUT state * or periodic procedures. */ @@ -292,6 +309,12 @@ public class ProcedureExecutor<TEnvironment> { if (!proc.hasParent() && !proc.isFinished()) { rollbackStack.put(proc.getProcId(), new RootProcedureState()); } + + // add the nonce to the map + if (proc.getNonceKey() != null) { + nonceKeysToProcIdsMap.put(proc.getNonceKey(), proc.getProcId()); + } + if (proc.getState() == ProcedureState.RUNNABLE) { runnablesCount++; } @@ -317,6 +340,7 @@ public class ProcedureExecutor<TEnvironment> { } assert !rollbackStack.containsKey(proc.getProcId()); completed.put(proc.getProcId(), newResultFromProcedure(proc)); + continue; } @@ -439,7 +463,8 @@ public class ProcedureExecutor<TEnvironment> { } // Add completed cleaner - waitingTimeout.add(new CompletedProcedureCleaner(conf, store, completed)); + waitingTimeout.add( + new CompletedProcedureCleaner(conf, store, completed, nonceKeysToProcIdsMap)); } public void stop() { @@ -470,6 +495,7 @@ public class ProcedureExecutor<TEnvironment> { completed.clear(); rollbackStack.clear(); procedures.clear(); + nonceKeysToProcIdsMap.clear(); waitingTimeout.clear(); runnables.clear(); lastProcId.set(-1); @@ -512,13 +538,53 @@ public class ProcedureExecutor<TEnvironment> { * @return the procedure id, that can be used to monitor the operation */ public long submitProcedure(final Procedure proc) { + return submitProcedure(proc, HConstants.NO_NONCE, HConstants.NO_NONCE); + } + + /** + * Add a new root-procedure to the executor. + * @param proc the new procedure to execute. + * @param nonceGroup + * @param nonce + * @return the procedure id, that can be used to monitor the operation + */ + public long submitProcedure( + final Procedure proc, + final long nonceGroup, + final long nonce) { Preconditions.checkArgument(proc.getState() == ProcedureState.INITIALIZING); Preconditions.checkArgument(isRunning()); Preconditions.checkArgument(lastProcId.get() >= 0); Preconditions.checkArgument(!proc.hasParent()); - // Initialize the Procedure ID - proc.setProcId(nextProcId()); + Long currentProcId; + + // The following part of the code has to be synchronized to prevent multiple request + // with the same nonce to execute at the same time. + synchronized (this) { + // Check whether the proc exists. If exist, just return the proc id. + // This is to prevent the same proc to submit multiple times (it could happen + // when client could not talk to server and resubmit the same request). + NonceKey noncekey = null; + if (nonce != HConstants.NO_NONCE) { + noncekey = new NonceKey(nonceGroup, nonce); + currentProcId = nonceKeysToProcIdsMap.get(noncekey); + if (currentProcId != null) { + // Found the proc + return currentProcId; + } + } + + // Initialize the Procedure ID + currentProcId = nextProcId(); + proc.setProcId(currentProcId); + + // This is new procedure. Set the noncekey and insert into the map. + if (noncekey != null) { + proc.setNonceKey(noncekey); + nonceKeysToProcIdsMap.put(noncekey, currentProcId); + } + } // end of synchronized (this) // Commit the transaction store.insert(proc, null); @@ -528,14 +594,14 @@ public class ProcedureExecutor<TEnvironment> { // Create the rollback stack for the procedure RootProcedureState stack = new RootProcedureState(); - rollbackStack.put(proc.getProcId(), stack); + rollbackStack.put(currentProcId, stack); // Submit the new subprocedures - assert !procedures.containsKey(proc.getProcId()); - procedures.put(proc.getProcId(), proc); - sendProcedureAddedNotification(proc.getProcId()); + assert !procedures.containsKey(currentProcId); + procedures.put(currentProcId, proc); + sendProcedureAddedNotification(currentProcId); runnables.addBack(proc); - return proc.getProcId(); + return currentProcId; } public ProcedureResult getResult(final long procId) { @@ -1090,8 +1156,10 @@ public class ProcedureExecutor<TEnvironment> { private static ProcedureResult newResultFromProcedure(final Procedure proc) { if (proc.isFailed()) { - return new ProcedureResult(proc.getStartTime(), proc.getLastUpdate(), proc.getException()); + return new ProcedureResult( + proc.getNonceKey(), proc.getStartTime(), proc.getLastUpdate(), proc.getException()); } - return new ProcedureResult(proc.getStartTime(), proc.getLastUpdate(), proc.getResult()); + return new ProcedureResult( + proc.getNonceKey(), proc.getStartTime(), proc.getLastUpdate(), proc.getResult()); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/0c900fe7/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java index 98c293b..ff5407f 100644 --- a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java +++ b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/ProcedureResult.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.procedure2; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.NonceKey; /** * Once a Procedure completes the ProcedureExecutor takes all the useful @@ -30,6 +31,7 @@ import org.apache.hadoop.hbase.classification.InterfaceStability; @InterfaceAudience.Private @InterfaceStability.Evolving public class ProcedureResult { + private final NonceKey nonceKey; private final RemoteProcedureException exception; private final long lastUpdate; private final long startTime; @@ -37,21 +39,39 @@ public class ProcedureResult { private long clientAckTime = -1; - public ProcedureResult(final long startTime, final long lastUpdate, + public ProcedureResult( + final NonceKey nonceKey, + final long startTime, + final long lastUpdate, final RemoteProcedureException exception) { - this.lastUpdate = lastUpdate; - this.startTime = startTime; - this.exception = exception; - this.result = null; + this(nonceKey, exception, lastUpdate, startTime, null); + } + + public ProcedureResult( + final NonceKey nonceKey, + final long startTime, + final long lastUpdate, + final byte[] result) { + this(nonceKey, null, lastUpdate, startTime, result); } - public ProcedureResult(final long startTime, final long lastUpdate, final byte[] result) { + public ProcedureResult( + final NonceKey nonceKey, + final RemoteProcedureException exception, + final long lastUpdate, + final long startTime, + final byte[] result) { + this.nonceKey = nonceKey; + this.exception = exception; this.lastUpdate = lastUpdate; this.startTime = startTime; - this.exception = null; this.result = result; } + public NonceKey getNonceKey() { + return nonceKey; + } + public boolean isFailed() { return exception != null; } http://git-wip-us.apache.org/repos/asf/hbase/blob/0c900fe7/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java index da590b8..f172834 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/ProcedureTestingUtility.java @@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.procedure2.store.ProcedureStore; import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore; @@ -112,7 +113,13 @@ public class ProcedureTestingUtility { } public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc) { - long procId = procExecutor.submitProcedure(proc); + return submitAndWait(procExecutor, proc, HConstants.NO_NONCE, HConstants.NO_NONCE); + } + + public static <TEnv> long submitAndWait(ProcedureExecutor<TEnv> procExecutor, Procedure proc, + final long nonceGroup, + final long nonce) { + long procId = procExecutor.submitProcedure(proc, nonceGroup, nonce); waitProcedure(procExecutor, procId); return procId; } http://git-wip-us.apache.org/repos/asf/hbase/blob/0c900fe7/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java ---------------------------------------------------------------------- diff --git a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java index f21b6fa..e69faf5 100644 --- a/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java +++ b/hbase-procedure/src/test/java/org/apache/hadoop/hbase/procedure2/TestProcedureRecovery.java @@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; - import org.junit.After; import org.junit.Before; import org.junit.Assert; @@ -76,6 +75,9 @@ public class TestProcedureRecovery { procStore.start(PROCEDURE_EXECUTOR_SLOTS); procExecutor.start(PROCEDURE_EXECUTOR_SLOTS); procSleepInterval = 0; + + ProcedureTestingUtility.setToggleKillBeforeStoreUpdate(procExecutor, false); + ProcedureTestingUtility.setKillBeforeStoreUpdate(procExecutor, false); } @After @@ -285,6 +287,41 @@ public class TestProcedureRecovery { ProcedureTestingUtility.assertIsAbortException(result); } + @Test(timeout=30000) + public void testCompletedProcWithSameNonce() throws Exception { + final long nonceGroup = 123; + final long nonce = 2222; + Procedure proc = new TestSingleStepProcedure(); + // Submit a proc and wait for its completion + long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce); + + // Restart + restart(); + Procedure proc2 = new TestSingleStepProcedure(); + // Submit a procedure with the same nonce and expect the same procedure would return. + long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce); + assertTrue(procId == procId2); + + ProcedureResult result = procExecutor.getResult(procId2); + ProcedureTestingUtility.assertProcNotFailed(result); + } + + @Test(timeout=30000) + public void testRunningProcWithSameNonce() throws Exception { + final long nonceGroup = 456; + final long nonce = 33333; + Procedure proc = new TestMultiStepProcedure(); + long procId = ProcedureTestingUtility.submitAndWait(procExecutor, proc, nonceGroup, nonce); + + // Restart + restart(); + Procedure proc2 = new TestMultiStepProcedure(); + // Submit a procedure with the same nonce and expect the same procedure would return. + long procId2 = ProcedureTestingUtility.submitAndWait(procExecutor, proc2, nonceGroup, nonce); + // The original proc is not completed and the new submission should have the same proc Id. + assertTrue(procId == procId2); + } + public static class TestStateMachineProcedure extends StateMachineProcedure<Void, TestStateMachineProcedure.State> { enum State { STATE_1, STATE_2, STATE_3, DONE }