Repository: hbase Updated Branches: refs/heads/branch-1.3 3737c4696 -> d316bf7c4
HBASE-16664 Timeout logic in AsyncProcess is broken Signed-off-by: chenheng <chenh...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d316bf7c Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d316bf7c Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d316bf7c Branch: refs/heads/branch-1.3 Commit: d316bf7c4b3fd4de7a108c4e025c8ddb8dc0a0b8 Parents: 3737c46 Author: Phil Yang <ud1...@gmail.com> Authored: Tue Oct 11 17:12:54 2016 +0800 Committer: chenheng <chenh...@apache.org> Committed: Thu Oct 13 19:52:02 2016 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/client/AsyncProcess.java | 75 +++++--- .../hbase/client/BufferedMutatorImpl.java | 21 ++- .../hadoop/hbase/client/ConnectionManager.java | 3 +- .../org/apache/hadoop/hbase/client/HTable.java | 22 ++- .../hadoop/hbase/client/HTableMultiplexer.java | 5 +- .../hbase/client/MultiServerCallable.java | 15 +- .../hbase/client/RetryingTimeTracker.java | 3 +- .../hadoop/hbase/client/TestAsyncProcess.java | 22 ++- .../hbase/client/HConnectionTestingUtility.java | 5 +- .../org/apache/hadoop/hbase/client/TestHCM.java | 172 +++++++++++++++++-- 10 files changed, 276 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/d316bf7c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index cdcb1b2..32de1e3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -232,7 +232,8 @@ class AsyncProcess { protected final long pause; protected int numTries; protected int serverTrackerTimeout; - protected int timeout; + protected int rpcTimeout; + protected int operationTimeout; protected long primaryCallTimeoutMicroseconds; // End configuration settings. @@ -275,7 +276,8 @@ class AsyncProcess { } public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool, - RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory) { + RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory, + int rpcTimeout) { if (hc == null) { throw new IllegalArgumentException("HConnection cannot be null."); } @@ -290,8 +292,9 @@ class AsyncProcess { HConstants.DEFAULT_HBASE_CLIENT_PAUSE); this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.rpcTimeout = rpcTimeout; + this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000); this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, @@ -336,6 +339,14 @@ class AsyncProcess { DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS); } + public void setRpcTimeout(int rpcTimeout) { + this.rpcTimeout = rpcTimeout; + } + + public void setOperationTimeout(int operationTimeout) { + this.operationTimeout = operationTimeout; + } + /** * @return pool if non null, otherwise returns this.pool if non null, otherwise throws * RuntimeException @@ -561,12 +572,12 @@ class AsyncProcess { */ public <CResult> AsyncRequestFuture submitAll(TableName tableName, List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) { - return submitAll(null, tableName, rows, callback, results, null, timeout); + return submitAll(null, tableName, rows, callback, results, null, operationTimeout, rpcTimeout); } public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) { - return submitAll(pool, tableName, rows, callback, results, null, timeout); + return submitAll(pool, tableName, rows, callback, results, null, operationTimeout, rpcTimeout); } /** * Submit immediately the list of rows, whatever the server status. Kept for backward @@ -580,7 +591,7 @@ class AsyncProcess { */ public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName, List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results, - PayloadCarryingServerCallable callable, int curTimeout) { + PayloadCarryingServerCallable callable, int operationTimeout, int rpcTimeout) { List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size()); // The position will be used by the processBatch to match the object array returned. @@ -600,7 +611,7 @@ class AsyncProcess { } AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture( tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null, - callable, curTimeout); + callable, operationTimeout, rpcTimeout); ars.groupAndSendMultiAction(actions, 1); return ars; } @@ -752,12 +763,12 @@ class AsyncProcess { if (callable == null) { callable = createCallable(server, tableName, multiAction); } - RpcRetryingCaller<MultiResponse> caller = createCaller(callable); + RpcRetryingCaller<MultiResponse> caller = createCaller(callable, rpcTimeout); try { if (callsInProgress != null) { callsInProgress.add(callable); } - res = caller.callWithoutRetries(callable, currentCallTotalTimeout); + res = caller.callWithoutRetries(callable, operationTimeout); if (res == null) { // Cancelled return; @@ -823,11 +834,14 @@ class AsyncProcess { private final boolean hasAnyReplicaGets; private final long nonceGroup; private PayloadCarryingServerCallable currentCallable; - private int currentCallTotalTimeout; + private int operationTimeout; + private int rpcTimeout; + private RetryingTimeTracker tracker; public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool, boolean needResults, Object[] results, - Batch.Callback<CResult> callback, PayloadCarryingServerCallable callable, int timeout) { + Batch.Callback<CResult> callback, PayloadCarryingServerCallable callable, + int operationTimeout, int rpcTimeout) { this.pool = pool; this.callback = callback; this.nonceGroup = nonceGroup; @@ -897,7 +911,11 @@ class AsyncProcess { this.errorsByServer = createServerErrorTracker(); this.errors = (globalErrors != null) ? globalErrors : new BatchErrors(); this.currentCallable = callable; - this.currentCallTotalTimeout = timeout; + this.operationTimeout = operationTimeout; + this.rpcTimeout = rpcTimeout; + if (callable == null) { + this.tracker = new RetryingTimeTracker().start(); + } } public Set<PayloadCarryingServerCallable> getCallsInProgress() { @@ -1717,6 +1735,16 @@ class AsyncProcess { waitUntilDone(); return results; } + + /** + * Create a callable. Isolated to be easily overridden in the tests. + */ + @VisibleForTesting + protected MultiServerCallable<Row> createCallable(final ServerName server, + TableName tableName, final MultiAction<Row> multi) { + return new MultiServerCallable<Row>(connection, tableName, server, + AsyncProcess.this.rpcFactory, multi, rpcTimeout, tracker); + } } private void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) { @@ -1738,10 +1766,10 @@ class AsyncProcess { protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture( TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool, Batch.Callback<CResult> callback, Object[] results, boolean needResults, - PayloadCarryingServerCallable callable, int curTimeout) { + PayloadCarryingServerCallable callable, int operationTimeout, int rpcTimeout) { return new AsyncRequestFutureImpl<CResult>( tableName, actions, nonceGroup, getPool(pool), needResults, - results, callback, callable, curTimeout); + results, callback, callable, operationTimeout, rpcTimeout); } @VisibleForTesting @@ -1750,24 +1778,17 @@ class AsyncProcess { TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool, Batch.Callback<CResult> callback, Object[] results, boolean needResults) { return createAsyncRequestFuture( - tableName, actions, nonceGroup, pool, callback, results, needResults, null, timeout); - } - - /** - * Create a callable. Isolated to be easily overridden in the tests. - */ - @VisibleForTesting - protected MultiServerCallable<Row> createCallable(final ServerName server, - TableName tableName, final MultiAction<Row> multi) { - return new MultiServerCallable<Row>(connection, tableName, server, this.rpcFactory, multi); + tableName, actions, nonceGroup, pool, callback, results, needResults, null, + operationTimeout, rpcTimeout); } /** * Create a caller. Isolated to be easily overridden in the tests. */ @VisibleForTesting - protected RpcRetryingCaller<MultiResponse> createCaller(PayloadCarryingServerCallable callable) { - return rpcCallerFactory.<MultiResponse> newCaller(); + protected RpcRetryingCaller<MultiResponse> createCaller(PayloadCarryingServerCallable callable, + int rpcTimeout) { + return rpcCallerFactory.<MultiResponse> newCaller(rpcTimeout); } @VisibleForTesting http://git-wip-us.apache.org/repos/asf/hbase/blob/d316bf7c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index 273f2e4..d722821 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -71,6 +72,8 @@ public class BufferedMutatorImpl implements BufferedMutator { private final int maxKeyValueSize; private boolean closed = false; private final ExecutorService pool; + private int rpcTimeout; + private int operationTimeout; @VisibleForTesting protected AsyncProcess ap; // non-final so can be overridden in test @@ -92,9 +95,13 @@ public class BufferedMutatorImpl implements BufferedMutator { params.getWriteBufferSize() : tableConf.getWriteBufferSize(); this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ? params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize(); - + this.rpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.operationTimeout = conn.getConfiguration().getInt( + HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); // puts need to track errors globally due to how the APIs currently work. - ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory); + ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, rpcTimeout); } @Override @@ -279,6 +286,16 @@ public class BufferedMutatorImpl implements BufferedMutator { return this.writeBufferSize; } + public void setRpcTimeout(int rpcTimeout) { + this.rpcTimeout = rpcTimeout; + this.ap.setRpcTimeout(rpcTimeout); + } + + public void setOperationTimeout(int operationTimeout) { + this.operationTimeout = operationTimeout; + this.ap.setOperationTimeout(operationTimeout); + } + /** * This is used for legacy purposes in {@link HTable#getWriteBuffer()} only. This should not beà * called from production uses. http://git-wip-us.apache.org/repos/asf/hbase/blob/d316bf7c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index b055884..4e9d208 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -2333,7 +2333,8 @@ class ConnectionManager { // For tests to override. protected AsyncProcess createAsyncProcess(Configuration conf) { // No default pool available. - return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory); + return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory, + rpcTimeout); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/d316bf7c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index efa03c6..4c31099 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -1035,13 +1035,12 @@ public class HTable implements HTableInterface, RegionLocator { */ @Override public void mutateRow(final RowMutations rm) throws IOException { - final RetryingTimeTracker tracker = new RetryingTimeTracker(); + final RetryingTimeTracker tracker = new RetryingTimeTracker().start(); PayloadCarryingServerCallable<MultiResponse> callable = new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(), rpcControllerFactory) { @Override public MultiResponse call(int callTimeout) throws IOException { - tracker.start(); controller.setPriority(tableName); int remainingTime = tracker.getRemainingTime(callTimeout); if (remainingTime == 0) { @@ -1071,7 +1070,7 @@ public class HTable implements HTableInterface, RegionLocator { } }; AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(), - null, null, callable, operationTimeout); + null, null, callable, operationTimeout, rpcTimeout); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -1344,13 +1343,12 @@ public class HTable implements HTableInterface, RegionLocator { public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final byte [] value, final RowMutations rm) throws IOException { - final RetryingTimeTracker tracker = new RetryingTimeTracker(); + final RetryingTimeTracker tracker = new RetryingTimeTracker().start(); PayloadCarryingServerCallable<MultiResponse> callable = new PayloadCarryingServerCallable<MultiResponse>(connection, getName(), rm.getRow(), rpcControllerFactory) { @Override public MultiResponse call(int callTimeout) throws IOException { - tracker.start(); controller.setPriority(tableName); int remainingTime = tracker.getRemainingTime(callTimeout); if (remainingTime == 0) { @@ -1384,7 +1382,7 @@ public class HTable implements HTableInterface, RegionLocator { * */ Object[] results = new Object[rm.getMutations().size()]; AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(), - null, results, callable, operationTimeout); + null, results, callable, operationTimeout, rpcTimeout); ars.waitUntilDone(); if (ars.hasError()) { throw ars.getErrors(); @@ -1800,6 +1798,10 @@ public class HTable implements HTableInterface, RegionLocator { public void setOperationTimeout(int operationTimeout) { this.operationTimeout = operationTimeout; + if (mutator != null) { + mutator.setOperationTimeout(operationTimeout); + } + multiAp.setOperationTimeout(operationTimeout); } public int getOperationTimeout() { @@ -1808,6 +1810,10 @@ public class HTable implements HTableInterface, RegionLocator { @Override public void setRpcTimeout(int rpcTimeout) { this.rpcTimeout = rpcTimeout; + if (mutator != null) { + mutator.setRpcTimeout(rpcTimeout); + } + multiAp.setRpcTimeout(rpcTimeout); } @Override public int getRpcTimeout() { @@ -1891,7 +1897,7 @@ public class HTable implements HTableInterface, RegionLocator { AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, pool, RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()), - true, RpcControllerFactory.instantiate(configuration)); + true, RpcControllerFactory.instantiate(configuration), rpcTimeout); AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs, new Callback<ClientProtos.CoprocessorServiceResult>() { @@ -1941,6 +1947,8 @@ public class HTable implements HTableInterface, RegionLocator { .writeBufferSize(connConfiguration.getWriteBufferSize()) .maxKeyValueSize(connConfiguration.getMaxKeyValueSize()) ); + mutator.setRpcTimeout(rpcTimeout); + mutator.setOperationTimeout(operationTimeout); } return mutator; } http://git-wip-us.apache.org/repos/asf/hbase/blob/d316bf7c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index dfb0104..6863eab 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -450,7 +450,10 @@ public class HTableMultiplexer { this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize); RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf); RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf); - this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory); + int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory, + rpcTimeout); this.executor = executor; this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000); } http://git-wip-us.apache.org/repos/asf/hbase/blob/d316bf7c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java index d0b4c81..115ba33 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java @@ -52,9 +52,12 @@ import com.google.protobuf.ServiceException; class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse> { private final MultiAction<R> multiAction; private final boolean cellBlock; + private final RetryingTimeTracker tracker; + private final int rpcTimeout; MultiServerCallable(final ClusterConnection connection, final TableName tableName, - final ServerName location, RpcControllerFactory rpcFactory, final MultiAction<R> multi) { + final ServerName location, RpcControllerFactory rpcFactory, final MultiAction<R> multi, + int rpcTimeout, RetryingTimeTracker tracker) { super(connection, tableName, null, rpcFactory); this.multiAction = multi; // RegionServerCallable has HRegionLocation field, but this is a multi-region request. @@ -62,6 +65,8 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse // we will store the server here, and throw if someone tries to obtain location/regioninfo. this.location = new HRegionLocation(null, location); this.cellBlock = isCellBlock(); + this.tracker = tracker; + this.rpcTimeout = rpcTimeout; } @Override @@ -79,7 +84,13 @@ class MultiServerCallable<R> extends PayloadCarryingServerCallable<MultiResponse } @Override - public MultiResponse call(int callTimeout) throws IOException { + public MultiResponse call(int operationTimeout) throws IOException { + int remainingTime = tracker.getRemainingTime(operationTimeout); + if (remainingTime <= 1) { + // "1" is a special return value in RetryingTimeTracker, see its implementation. + throw new DoNotRetryIOException("Operation Timeout"); + } + int callTimeout = Math.min(rpcTimeout, remainingTime); int countOfActions = this.multiAction.size(); if (countOfActions <= 0) throw new DoNotRetryIOException("No Actions"); MultiRequest.Builder multiRequestBuilder = MultiRequest.newBuilder(); http://git-wip-us.apache.org/repos/asf/hbase/blob/d316bf7c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java index 24288e6..406928a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java @@ -25,10 +25,11 @@ class RetryingTimeTracker { private long globalStartTime = -1; - public void start() { + public RetryingTimeTracker start() { if (this.globalStartTime < 0) { this.globalStartTime = EnvironmentEdgeManager.currentTime(); } + return this; } public int getRemainingTime(int callTimeout) { http://git-wip-us.apache.org/repos/asf/hbase/blob/d316bf7c/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 3a0c08d..d76a99f 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -155,14 +155,18 @@ public class TestAsyncProcess { public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) { super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)), - new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf)); + new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf), + conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); } public MyAsyncProcess( ClusterConnection hc, Configuration conf, boolean useGlobalErrors) { super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())), - new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf)); + new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), + conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); } public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors, @@ -174,7 +178,9 @@ public class TestAsyncProcess { throw new RejectedExecutionException("test under failure"); } }, - new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf)); + new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), + conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); } @Override @@ -187,7 +193,7 @@ public class TestAsyncProcess { @Override protected RpcRetryingCaller<MultiResponse> createCaller( - PayloadCarryingServerCallable callable) { + PayloadCarryingServerCallable callable, int rpcTimeout) { callsCt.incrementAndGet(); MultiServerCallable callable1 = (MultiServerCallable) callable; final MultiResponse mr = createMultiResponse( @@ -250,7 +256,7 @@ public class TestAsyncProcess { @Override protected RpcRetryingCaller<MultiResponse> createCaller( - PayloadCarryingServerCallable callable) { + PayloadCarryingServerCallable callable, int rpcTimeout) { callsCt.incrementAndGet(); return new CallerWithFailure(ioe); } @@ -287,7 +293,7 @@ public class TestAsyncProcess { @Override protected RpcRetryingCaller<MultiResponse> createCaller( - PayloadCarryingServerCallable payloadCallable) { + PayloadCarryingServerCallable payloadCallable, int rpcTimeout) { MultiServerCallable<Row> callable = (MultiServerCallable) payloadCallable; final MultiResponse mr = createMultiResponse( callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() { @@ -1118,7 +1124,9 @@ public class TestAsyncProcess { public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf, ExecutorService pool) { super(hc, conf, pool, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory( - conf)); + conf), + conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/d316bf7c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index 7b22ba4..dcac58f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.HRegionInfo; @@ -159,7 +160,9 @@ public class HConnectionTestingUtility { Mockito.when(c.getNonceGenerator()).thenReturn(ng); Mockito.when(c.getAsyncProcess()).thenReturn( new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false, - RpcControllerFactory.instantiate(conf))); + RpcControllerFactory.instantiate(conf), + conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT))); Mockito.doNothing().when(c).incCount(); Mockito.doNothing().when(c).decCount(); Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn( http://git-wip-us.apache.org/repos/asf/hbase/blob/d316bf7c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 3307d42..c6482fe 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -67,12 +67,14 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterBase; +import org.apache.hadoop.hbase.ipc.CallTimeoutException; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -127,14 +129,14 @@ public class TestHCM { * This copro sleeps 20 second. The first call it fails. The second time, it works. */ public static class SleepAndFailFirstTime extends BaseRegionObserver { - static final AtomicLong ct = new AtomicLong(0); - static final String SLEEP_TIME_CONF_KEY = - "hbase.coprocessor.SleepAndFailFirstTime.sleepTime"; - static final long DEFAULT_SLEEP_TIME = 20000; - static final AtomicLong sleepTime = new AtomicLong(DEFAULT_SLEEP_TIME); + static final AtomicLong ct = new AtomicLong(0); + static final String SLEEP_TIME_CONF_KEY = + "hbase.coprocessor.SleepAndFailFirstTime.sleepTime"; + static final long DEFAULT_SLEEP_TIME = 20000; + static final AtomicLong sleepTime = new AtomicLong(DEFAULT_SLEEP_TIME); - public SleepAndFailFirstTime() { - } + public SleepAndFailFirstTime() { + } @Override public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) { @@ -145,12 +147,42 @@ public class TestHCM { @Override public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, - final Get get, final List<Cell> results) throws IOException { + final Get get, final List<Cell> results) throws IOException { + Threads.sleep(sleepTime.get()); + if (ct.incrementAndGet() == 1) { + throw new IOException("first call I fail"); + } + } + + @Override + public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, + final Put put, final WALEdit edit, final Durability durability) throws IOException { + Threads.sleep(sleepTime.get()); + if (ct.incrementAndGet() == 1) { + throw new IOException("first call I fail"); + } + } + + @Override + public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> e, + final Delete delete, + final WALEdit edit, final Durability durability) throws IOException { Threads.sleep(sleepTime.get()); - if (ct.incrementAndGet() == 1){ + if (ct.incrementAndGet() == 1) { throw new IOException("first call I fail"); } } + + @Override + public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> e, + final Increment increment) throws IOException { + Threads.sleep(sleepTime.get()); + if (ct.incrementAndGet() == 1) { + throw new IOException("first call I fail"); + } + return super.preIncrement(e, increment); + } + } public static class SleepCoprocessor extends BaseRegionObserver { @@ -160,16 +192,26 @@ public class TestHCM { final Get get, final List<Cell> results) throws IOException { Threads.sleep(SLEEP_TIME); } - } - public static class SleepWriteCoprocessor extends BaseRegionObserver { - public static final int SLEEP_TIME = 5000; @Override public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> e, - final Increment increment) throws IOException { + final Increment increment) throws IOException { Threads.sleep(SLEEP_TIME); return super.preIncrement(e, increment); } + + @Override + public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> e, final Delete delete, + final WALEdit edit, final Durability durability) throws IOException { + Threads.sleep(SLEEP_TIME); + } + + @Override + public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e, + final Put put, final WALEdit edit, final Durability durability) throws IOException { + Threads.sleep(SLEEP_TIME); + } + } public static class SleepLongerAtFirstCoprocessor extends BaseRegionObserver { @@ -358,11 +400,12 @@ public class TestHCM { * timeouted when the server answers. */ @Test - public void testOperationTimeout() throws Exception { - HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testOperationTimeout"); + public void testGetOperationTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testGetOperationTimeout"); hdt.addCoprocessor(SleepAndFailFirstTime.class.getName()); - HTable table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, TEST_UTIL.getConfiguration()); + Table table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, TEST_UTIL.getConfiguration()); table.setRpcTimeout(Integer.MAX_VALUE); + SleepAndFailFirstTime.ct.set(0); // Check that it works if the timeout is big enough table.setOperationTimeout(120 * 1000); table.get(new Get(FAM_NAM)); @@ -385,6 +428,99 @@ public class TestHCM { } } + @Test + public void testPutOperationTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testPutOperationTimeout"); + hdt.addCoprocessor(SleepAndFailFirstTime.class.getName()); + Table table = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM },TEST_UTIL.getConfiguration()); + table.setRpcTimeout(Integer.MAX_VALUE); + SleepAndFailFirstTime.ct.set(0); + // Check that it works if the timeout is big enough + table.setOperationTimeout(120 * 1000); + table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM)); + + // Resetting and retrying. Will fail this time, not enough time for the second try + SleepAndFailFirstTime.ct.set(0); + try { + table.setOperationTimeout(30 * 1000); + table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM)); + Assert.fail("We expect an exception here"); + } catch (RetriesExhaustedWithDetailsException e) { + // The client has a CallTimeout class, but it's not shared.We're not very clean today, + // in the general case you can expect the call to stop, but the exception may vary. + // In this test however, we're sure that it will be a socket timeout. + LOG.info("We received an exception, as expected ", e); + } catch (IOException e) { + Assert.fail("Wrong exception:" + e.getMessage()); + } finally { + table.close(); + } + } + + @Test + public void testDeleteOperationTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDeleteOperationTimeout"); + hdt.addCoprocessor(SleepAndFailFirstTime.class.getName()); + Table table = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM },TEST_UTIL.getConfiguration()); + table.setRpcTimeout(Integer.MAX_VALUE); + SleepAndFailFirstTime.ct.set(0); + // Check that it works if the timeout is big enough + table.setOperationTimeout(120 * 1000); + table.delete(new Delete(FAM_NAM)); + + // Resetting and retrying. Will fail this time, not enough time for the second try + SleepAndFailFirstTime.ct.set(0); + try { + table.setOperationTimeout(30 * 1000); + table.delete(new Delete(FAM_NAM)); + Assert.fail("We expect an exception here"); + } catch (IOException e) { + // The client has a CallTimeout class, but it's not shared.We're not very clean today, + // in the general case you can expect the call to stop, but the exception may vary. + // In this test however, we're sure that it will be a socket timeout. + LOG.info("We received an exception, as expected ", e); + } finally { + table.close(); + } + } + @Test + public void testDeleteRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDeleteRpcTimeout"); + hdt.addCoprocessor(SleepCoprocessor.class.getName()); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + + try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) { + t.setRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2); + t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100); + Delete d = new Delete(FAM_NAM); + d.addColumn(FAM_NAM, FAM_NAM, 1); + t.delete(d); + fail("Write should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected + } + + } + + @Test + public void testPutRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testPutRpcTimeout"); + hdt.addCoprocessor(SleepCoprocessor.class.getName()); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + + try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) { + t.setRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2); + t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100); + Put p = new Put(FAM_NAM); + p.addColumn(FAM_NAM, FAM_NAM, FAM_NAM); + t.put(p); + fail("Write should not have succeeded"); + } catch (IOException e) { + // expected + } + + } + @Test(expected = RetriesExhaustedException.class) public void testRpcTimeout() throws Exception { HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcTimeout"); @@ -426,6 +562,7 @@ public class TestHCM { TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }).close(); Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + SleepAndFailFirstTime.ct.set(0); c.setInt(HConstants.HBASE_CLIENT_PAUSE, 3000); c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 4000); @@ -932,8 +1069,7 @@ public class TestHCM { curServer.getServerName().getPort(), conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort()); - TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, - HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY); table.close(); }