Repository: hbase Updated Branches: refs/heads/branch-1 6b233c433 -> ec99838b9 refs/heads/master 4e08a8bec -> 30d7eeaef
HBASE-15866 Split hbase.rpc.timeout into *.read.timeout and *.write.timeout Signed-off-by: Andrew Purtell <[email protected]> Amending-Author: Andrew Purtell <[email protected]> Conflicts: hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ec99838b Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ec99838b Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ec99838b Branch: refs/heads/branch-1 Commit: ec99838b9c5ac931b16e7cc3c7c2b7bf5b024c0c Parents: 6b233c4 Author: Vivek <[email protected]> Authored: Fri Aug 5 17:25:06 2016 -0700 Committer: Andrew Purtell <[email protected]> Committed: Sat Aug 6 10:38:41 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/client/AsyncProcess.java | 6 +- .../hbase/client/BufferedMutatorImpl.java | 8 +- .../hadoop/hbase/client/ConnectionManager.java | 7 +- .../org/apache/hadoop/hbase/client/HTable.java | 66 +++++++++---- .../hadoop/hbase/client/HTableMultiplexer.java | 6 +- .../apache/hadoop/hbase/client/HTablePool.java | 28 +++++- .../org/apache/hadoop/hbase/client/Table.java | 46 ++++++++- .../hadoop/hbase/client/TestAsyncProcess.java | 11 ++- .../org/apache/hadoop/hbase/HConstants.java | 13 +++ .../hadoop/hbase/rest/client/RemoteHTable.java | 28 +++++- .../hadoop/hbase/client/HTableWrapper.java | 22 ++++- .../hbase/client/HConnectionTestingUtility.java | 4 +- .../org/apache/hadoop/hbase/client/TestHCM.java | 99 +++++++++++++++++--- 13 files changed, 291 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/ec99838b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 209ec0e..780de18 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -280,7 +280,8 @@ class AsyncProcess { } public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool, - RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory) { + RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, RpcControllerFactory rpcFactory, + int rpcTimeout) { if (hc == null) { throw new IllegalArgumentException("HConnection cannot be null."); } @@ -295,8 +296,7 @@ class AsyncProcess { HConstants.DEFAULT_HBASE_CLIENT_PAUSE); this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); - this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.timeout = rpcTimeout; this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000); this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, http://git-wip-us.apache.org/repos/asf/hbase/blob/ec99838b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index 273f2e4..37a38be 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; // Needed for write rpc timeout import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -71,6 +72,7 @@ public class BufferedMutatorImpl implements BufferedMutator { private final int maxKeyValueSize; private boolean closed = false; private final ExecutorService pool; + private int writeRpcTimeout; // needed to pass in through AsyncProcess constructor @VisibleForTesting protected AsyncProcess ap; // non-final so can be overridden in test @@ -93,8 +95,12 @@ public class BufferedMutatorImpl implements BufferedMutator { this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ? params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize(); + this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + // puts need to track errors globally due to how the APIs currently work. - ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory); + ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, writeRpcTimeout); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/ec99838b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java index 96ebebc..50ac87d 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java @@ -560,6 +560,7 @@ class ConnectionManager { private final boolean useMetaReplicas; private final int numTries; final int rpcTimeout; + final int writeRpcTimeout; private NonceGenerator nonceGenerator = null; private final AsyncProcess asyncProcess; // single tracker per connection @@ -654,6 +655,9 @@ class ConnectionManager { this.rpcTimeout = conf.getInt( HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.writeRpcTimeout = conf.getInt( + HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) { synchronized (nonceGeneratorCreateLock) { if (ConnectionManager.nonceGenerator == null) { @@ -2340,7 +2344,8 @@ class ConnectionManager { // For tests to override. protected AsyncProcess createAsyncProcess(Configuration conf) { // No default pool available. - return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory); + return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory, + writeRpcTimeout); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/ec99838b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 4cd11d0..a793f3e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -125,7 +125,8 @@ public class HTable implements HTableInterface, RegionLocator { protected long scannerMaxResultSize; private ExecutorService pool; // For Multi & Scan private int operationTimeout; // global timeout for each blocking method with retrying rpc - private int rpcTimeout; // timeout for each rpc request + private int readRpcTimeout; // timeout for each read rpc request + private int writeRpcTimeout; // timeout for each write rpc request private final boolean cleanupPoolOnClose; // shutdown the pool in close() private final boolean cleanupConnectionOnClose; // close the connection in close() private Consistency defaultConsistency = Consistency.STRONG; @@ -356,8 +357,12 @@ public class HTable implements HTableInterface, RegionLocator { } this.operationTimeout = tableName.isSystemTable() ? connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout(); - this.rpcTimeout = configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.readRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, + configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + this.writeRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); this.scannerCaching = connConfiguration.getScannerCaching(); this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); if (this.rpcCallerFactory == null) { @@ -573,7 +578,7 @@ public class HTable implements HTableInterface, RegionLocator { @Override public HTableDescriptor getTableDescriptor() throws IOException { HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory, - rpcControllerFactory, operationTimeout, rpcTimeout); + rpcControllerFactory, operationTimeout, readRpcTimeout); if (htd != null) { return new UnmodifyableHTableDescriptor(htd); } @@ -755,7 +760,7 @@ public class HTable implements HTableInterface, RegionLocator { } } }; - return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory.<Result>newCaller(readRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -861,7 +866,7 @@ public class HTable implements HTableInterface, RegionLocator { } } }; - return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory.<Result>newCaller(readRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -978,7 +983,7 @@ public class HTable implements HTableInterface, RegionLocator { } } }; - rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable, + rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -1108,7 +1113,7 @@ public class HTable implements HTableInterface, RegionLocator { } } }; - return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory.<Result> newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -1140,7 +1145,7 @@ public class HTable implements HTableInterface, RegionLocator { } } }; - return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory.<Result> newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -1211,7 +1216,7 @@ public class HTable implements HTableInterface, RegionLocator { } } }; - return rpcCallerFactory.<Long> newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory.<Long> newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -1241,7 +1246,7 @@ public class HTable implements HTableInterface, RegionLocator { } } }; - return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -1272,7 +1277,7 @@ public class HTable implements HTableInterface, RegionLocator { } } }; - return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -1302,7 +1307,7 @@ public class HTable implements HTableInterface, RegionLocator { } } }; - return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -1333,7 +1338,7 @@ public class HTable implements HTableInterface, RegionLocator { } } }; - return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -1806,12 +1811,35 @@ public class HTable implements HTableInterface, RegionLocator { return operationTimeout; } - @Override public void setRpcTimeout(int rpcTimeout) { - this.rpcTimeout = rpcTimeout; + @Override + @Deprecated + public int getRpcTimeout() { + return readRpcTimeout; + } + + @Override + @Deprecated + public void setRpcTimeout(int rpcTimeout) { + this.readRpcTimeout = rpcTimeout; + this.writeRpcTimeout = rpcTimeout; } - @Override public int getRpcTimeout() { - return rpcTimeout; + @Override + public int getWriteRpcTimeout() { + return writeRpcTimeout; + } + + @Override + public void setWriteRpcTimeout(int writeRpcTimeout) { + this.writeRpcTimeout = writeRpcTimeout; + } + + @Override + public int getReadRpcTimeout() { return readRpcTimeout; } + + @Override + public void setReadRpcTimeout(int readRpcTimeout) { + this.readRpcTimeout = readRpcTimeout; } @Override @@ -1891,7 +1919,7 @@ public class HTable implements HTableInterface, RegionLocator { AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, pool, RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()), - true, RpcControllerFactory.instantiate(configuration)); + true, RpcControllerFactory.instantiate(configuration), readRpcTimeout); AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs, new Callback<ClientProtos.CoprocessorServiceResult>() { http://git-wip-us.apache.org/repos/asf/hbase/blob/ec99838b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index dfb0104..7b2b136 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -441,6 +441,7 @@ public class HTableMultiplexer { private final ScheduledExecutorService executor; private final int maxRetryInQueue; private final AtomicInteger retryInQueue = new AtomicInteger(0); + private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr, HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize, @@ -450,7 +451,10 @@ public class HTableMultiplexer { this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize); RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf); RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf); - this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory); + this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory, writeRpcTimeout); this.executor = executor; this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000); } http://git-wip-us.apache.org/repos/asf/hbase/blob/ec99838b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java index 2d18367..502703b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTablePool.java @@ -678,12 +678,36 @@ public class HTablePool implements Closeable { return table.getOperationTimeout(); } - @Override public void setRpcTimeout(int rpcTimeout) { + @Override + @Deprecated + public void setRpcTimeout(int rpcTimeout) { table.setRpcTimeout(rpcTimeout); } - @Override public int getRpcTimeout() { + @Override + @Deprecated + public int getRpcTimeout() { return table.getRpcTimeout(); } + + @Override + public int getReadRpcTimeout() { + return table.getReadRpcTimeout(); + } + + @Override + public void setReadRpcTimeout(int readRpcTimeout) { + table.setReadRpcTimeout(readRpcTimeout); + } + + @Override + public int getWriteRpcTimeout() { + return table.getWriteRpcTimeout(); + } + + @Override + public void setWriteRpcTimeout(int writeRpcTimeout) { + table.setWriteRpcTimeout(writeRpcTimeout); + } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ec99838b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java index 8c6169d..1a6572b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java @@ -614,16 +614,56 @@ public interface Table extends Closeable { public int getOperationTimeout(); /** + * Get timeout (millisecond) of each rpc request in this Table instance. + * + * @returns Currently configured read timeout + * @deprecated Use getReadRpcTimeout or getWriteRpcTimeout instead + */ + @Deprecated + int getRpcTimeout(); + + /** * Set timeout (millisecond) of each rpc request in operations of this Table instance, will * override the value of hbase.rpc.timeout in configuration. * If a rpc request waiting too long, it will stop waiting and send a new request to retry until * retries exhausted or operation timeout reached. + * <p> + * NOTE: This will set both the read and write timeout settings to the provided value. + * * @param rpcTimeout the timeout of each rpc request in millisecond. + * + * @deprecated Use setReadRpcTimeout or setWriteRpcTimeout instead */ - public void setRpcTimeout(int rpcTimeout); + @Deprecated + void setRpcTimeout(int rpcTimeout); /** - * Get timeout (millisecond) of each rpc request in this Table instance. + * Get timeout (millisecond) of each rpc read request in this Table instance. + */ + int getReadRpcTimeout(); + + /** + * Set timeout (millisecond) of each rpc read request in operations of this Table instance, will + * override the value of hbase.rpc.read.timeout in configuration. + * If a rpc read request waiting too long, it will stop waiting and send a new request to retry + * until retries exhausted or operation timeout reached. + * + * @param readRpcTimeout + */ + void setReadRpcTimeout(int readRpcTimeout); + + /** + * Get timeout (millisecond) of each rpc write request in this Table instance. + */ + int getWriteRpcTimeout(); + + /** + * Set timeout (millisecond) of each rpc write request in operations of this Table instance, will + * override the value of hbase.rpc.write.timeout in configuration. + * If a rpc write request waiting too long, it will stop waiting and send a new request to retry + * until retries exhausted or operation timeout reached. + * + * @param writeRpcTimeout */ - public int getRpcTimeout(); + void setWriteRpcTimeout(int writeRpcTimeout); } http://git-wip-us.apache.org/repos/asf/hbase/blob/ec99838b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index 3a0c08d..8fb2e8b 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -135,6 +135,7 @@ public class TestAsyncProcess { final AtomicInteger nbActions = new AtomicInteger(); public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>(); public AtomicInteger callsCt = new AtomicInteger(); + private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); @Override protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName, @@ -155,14 +156,14 @@ public class TestAsyncProcess { public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) { super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)), - new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf)); + new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf), rpcTimeout); } public MyAsyncProcess( ClusterConnection hc, Configuration conf, boolean useGlobalErrors) { super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())), - new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf)); + new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), rpcTimeout); } public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors, @@ -174,7 +175,7 @@ public class TestAsyncProcess { throw new RejectedExecutionException("test under failure"); } }, - new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf)); + new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), rpcTimeout); } @Override @@ -1115,10 +1116,12 @@ public class TestAsyncProcess { } static class AsyncProcessForThrowableCheck extends AsyncProcess { + private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf, ExecutorService pool) { super(hc, conf, pool, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory( - conf)); + conf), rpcTimeout); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ec99838b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 8cb937a..654c012 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -799,10 +799,23 @@ public final class HConstants { /** * timeout for each RPC + * @deprecated Use {@link #HBASE_RPC_READ_TIMEOUT_KEY} or {@link #HBASE_RPC_WRITE_TIMEOUT_KEY} + * instead. */ + @Deprecated public static final String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout"; /** + * timeout for each read RPC + */ + public static final String HBASE_RPC_READ_TIMEOUT_KEY = "hbase.rpc.read.timeout"; + + /** + * timeout for each write RPC + */ + public static final String HBASE_RPC_WRITE_TIMEOUT_KEY = "hbase.rpc.write.timeout"; + + /** * Default value of {@link #HBASE_RPC_TIMEOUT_KEY} */ public static final int DEFAULT_HBASE_RPC_TIMEOUT = 60000; http://git-wip-us.apache.org/repos/asf/hbase/blob/ec99838b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java ---------------------------------------------------------------------- diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java index 8fa1b8a..63b18b1 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java @@ -866,11 +866,35 @@ public class RemoteHTable implements Table { throw new UnsupportedOperationException(); } - @Override public void setRpcTimeout(int rpcTimeout) { + @Override + @Deprecated + public void setRpcTimeout(int rpcTimeout) { + throw new UnsupportedOperationException(); + } + + @Override + @Deprecated + public int getRpcTimeout() { + throw new UnsupportedOperationException(); + } + + @Override + public int getReadRpcTimeout() { + throw new UnsupportedOperationException(); + } + + @Override + public void setReadRpcTimeout(int readRpcTimeout) { throw new UnsupportedOperationException(); } - @Override public int getRpcTimeout() { + @Override + public int getWriteRpcTimeout() { + throw new UnsupportedOperationException(); + } + + @Override + public void setWriteRpcTimeout(int writeRpcTimeout) { throw new UnsupportedOperationException(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ec99838b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java index 2d25f63..dc27240 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java @@ -374,11 +374,25 @@ public class HTableWrapper implements HTableInterface { return table.getOperationTimeout(); } - @Override public void setRpcTimeout(int rpcTimeout) { + @Override + @Deprecated + public int getRpcTimeout() { return table.getRpcTimeout(); } + + @Override + @Deprecated + public void setRpcTimeout(int rpcTimeout) { table.setRpcTimeout(rpcTimeout); } - @Override public int getRpcTimeout() { - return table.getRpcTimeout(); - } + @Override + public void setWriteRpcTimeout(int writeRpcTimeout) { table.setWriteRpcTimeout(writeRpcTimeout); } + + @Override + public void setReadRpcTimeout(int readRpcTimeout) { table.setReadRpcTimeout(readRpcTimeout); } + + @Override + public int getWriteRpcTimeout() { return table.getWriteRpcTimeout(); } + + @Override + public int getReadRpcTimeout() { return table.getReadRpcTimeout(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/ec99838b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index 1a7c2ef..8cadbc8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; @@ -154,7 +155,8 @@ public class HConnectionTestingUtility { Mockito.when(c.getNonceGenerator()).thenReturn(ng); Mockito.when(c.getAsyncProcess()).thenReturn( new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false, - RpcControllerFactory.instantiate(conf))); + RpcControllerFactory.instantiate(conf), conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT))); Mockito.doNothing().when(c).incCount(); Mockito.doNothing().when(c).decCount(); Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn( http://git-wip-us.apache.org/repos/asf/hbase/blob/ec99838b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 0d9efa2..443cd12 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -18,12 +18,7 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; +import static org.junit.Assert.*; import java.io.IOException; import java.lang.reflect.Field; @@ -155,6 +150,16 @@ public class TestHCM { } } + public static class SleepWriteCoprocessor extends BaseRegionObserver { + public static final int SLEEP_TIME = 5000; + @Override + public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> e, + final Increment increment) throws IOException { + Threads.sleep(SLEEP_TIME); + return super.preIncrement(e, increment); + } + } + @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true); @@ -348,18 +353,88 @@ public class TestHCM { } } - @Test(expected = RetriesExhaustedException.class) + @Test public void testRpcTimeout() throws Exception { HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcTimeout"); hdt.addCoprocessor(SleepCoprocessor.class.getName()); Configuration c = new Configuration(TEST_UTIL.getConfiguration()); try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) { - assert t instanceof HTable; - HTable table = (HTable) t; - table.setRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2); - table.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100); - table.get(new Get(FAM_NAM)); + t.setRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2); + t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100); + t.get(new Get(FAM_NAM)); + fail("Get should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected + } + + // Again, with configuration based override + c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SleepCoprocessor.SLEEP_TIME / 2); + try (Connection conn = ConnectionFactory.createConnection(c)) { + try (Table t = conn.getTable(hdt.getTableName())) { + t.get(new Get(FAM_NAM)); + fail("Get should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected + } + } + } + + @Test + public void testWriteRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testWriteRpcTimeout"); + hdt.addCoprocessor(SleepWriteCoprocessor.class.getName()); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + + try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) { + t.setWriteRpcTimeout(SleepWriteCoprocessor.SLEEP_TIME / 2); + t.setOperationTimeout(SleepWriteCoprocessor.SLEEP_TIME * 100); + Increment i = new Increment(FAM_NAM); + i.addColumn(FAM_NAM, FAM_NAM, 1); + t.increment(i); + fail("Write should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected + } + + // Again, with configuration based override + c.setInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, SleepWriteCoprocessor.SLEEP_TIME / 2); + try (Connection conn = ConnectionFactory.createConnection(c)) { + try (Table t = conn.getTable(hdt.getTableName())) { + Increment i = new Increment(FAM_NAM); + i.addColumn(FAM_NAM, FAM_NAM, 1); + t.increment(i); + fail("Write should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected + } + } + } + + @Test + public void testReadRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testReadRpcTimeout"); + hdt.addCoprocessor(SleepCoprocessor.class.getName()); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + + try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) { + t.setReadRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2); + t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100); + t.get(new Get(FAM_NAM)); + fail("Get should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected + } + + // Again, with configuration based override + c.setInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, SleepCoprocessor.SLEEP_TIME / 2); + try (Connection conn = ConnectionFactory.createConnection(c)) { + try (Table t = conn.getTable(hdt.getTableName())) { + t.get(new Get(FAM_NAM)); + fail("Get should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected + } } }
