HBASE-15866 Split hbase.rpc.timeout into *.read.timeout and *.write.timeout
Signed-off-by: Andrew Purtell <apurt...@apache.org> Amending-Author: Andrew Purtell <apurt...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/30d7eeae Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/30d7eeae Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/30d7eeae Branch: refs/heads/hbase-12439 Commit: 30d7eeaefe431bc263519064662e6dc8ad8ff05a Parents: 4e08a8b Author: Vivek <vkopp...@salesforce.com> Authored: Fri Aug 5 17:25:06 2016 -0700 Committer: Andrew Purtell <apurt...@apache.org> Committed: Sat Aug 6 16:55:09 2016 -0700 ---------------------------------------------------------------------- .../hadoop/hbase/client/AsyncProcess.java | 5 +- .../hbase/client/BufferedMutatorImpl.java | 8 +- .../hbase/client/ConnectionImplementation.java | 3 +- .../org/apache/hadoop/hbase/client/HTable.java | 58 +++++++++---- .../hadoop/hbase/client/HTableMultiplexer.java | 6 +- .../org/apache/hadoop/hbase/client/Table.java | 43 +++++++++- .../hadoop/hbase/client/TestAsyncProcess.java | 11 ++- .../org/apache/hadoop/hbase/HConstants.java | 13 +++ .../hadoop/hbase/rest/client/RemoteHTable.java | 22 +++++ .../hadoop/hbase/client/HTableWrapper.java | 14 +++ .../hbase/client/HConnectionTestingUtility.java | 6 +- .../org/apache/hadoop/hbase/client/TestHCM.java | 90 ++++++++++++++++++-- .../hbase/regionserver/RegionAsTable.java | 14 +++ 13 files changed, 257 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/30d7eeae/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index 4514560..1383ca0 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -281,7 +281,7 @@ class AsyncProcess { public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool, RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors, - RpcControllerFactory rpcFactory) { + RpcControllerFactory rpcFactory, int rpcTimeout) { if (hc == null) { throw new IllegalArgumentException("ClusterConnection cannot be null."); } @@ -297,8 +297,7 @@ class AsyncProcess { // how many times we could try in total, one more than retry number this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1; - this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.timeout = rpcTimeout; this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000); this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS, http://git-wip-us.apache.org/repos/asf/hbase/blob/30d7eeae/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index e98ad4e..39e4f75 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; // Needed for write rpc timeout import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; @@ -72,6 +73,7 @@ public class BufferedMutatorImpl implements BufferedMutator { private final int maxKeyValueSize; private boolean closed = false; private final ExecutorService pool; + private int writeRpcTimeout; // needed to pass in through AsyncProcess constructor @VisibleForTesting protected AsyncProcess ap; // non-final so can be overridden in test @@ -94,8 +96,12 @@ public class BufferedMutatorImpl implements BufferedMutator { this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ? params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize(); + this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + // puts need to track errors globally due to how the APIs currently work. - ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory); + ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, writeRpcTimeout); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/30d7eeae/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 8dcda13..04edd25 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -1823,7 +1823,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable { // For tests to override. protected AsyncProcess createAsyncProcess(Configuration conf) { // No default pool available. - return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory); + int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory, rpcTimeout); } @Override http://git-wip-us.apache.org/repos/asf/hbase/blob/30d7eeae/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index fbd9f51..882e21b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -112,7 +112,8 @@ public class HTable implements Table { protected long scannerMaxResultSize; private ExecutorService pool; // For Multi & Scan private int operationTimeout; // global timeout for each blocking method with retrying rpc - private int rpcTimeout; // timeout for each rpc request + private int readRpcTimeout; // timeout for each read rpc request + private int writeRpcTimeout; // timeout for each write rpc request private final boolean cleanupPoolOnClose; // shutdown the pool in close() private final boolean cleanupConnectionOnClose; // close the connection in close() private Consistency defaultConsistency = Consistency.STRONG; @@ -212,8 +213,12 @@ public class HTable implements Table { this.operationTimeout = tableName.isSystemTable() ? connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout(); - this.rpcTimeout = configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, - HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.readRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, + configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + this.writeRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); this.scannerCaching = connConfiguration.getScannerCaching(); this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize(); if (this.rpcCallerFactory == null) { @@ -257,7 +262,7 @@ public class HTable implements Table { @Override public HTableDescriptor getTableDescriptor() throws IOException { HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory, - rpcControllerFactory, operationTimeout, rpcTimeout); + rpcControllerFactory, operationTimeout, readRpcTimeout); if (htd != null) { return new UnmodifyableHTableDescriptor(htd); } @@ -430,7 +435,7 @@ public class HTable implements Table { } } }; - return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory.<Result>newCaller(readRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -528,7 +533,7 @@ public class HTable implements Table { } } }; - rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable, + rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -654,7 +659,7 @@ public class HTable implements Table { } } }; - return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory.<Result> newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -686,7 +691,7 @@ public class HTable implements Table { } } }; - return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory.<Result> newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -742,7 +747,7 @@ public class HTable implements Table { } } }; - return rpcCallerFactory.<Long> newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory.<Long> newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -772,7 +777,7 @@ public class HTable implements Table { } } }; - return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -803,7 +808,7 @@ public class HTable implements Table { } } }; - return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -833,7 +838,7 @@ public class HTable implements Table { } } }; - return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -864,7 +869,7 @@ public class HTable implements Table { } } }; - return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable, + return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable, this.operationTimeout); } @@ -1196,13 +1201,34 @@ public class HTable implements Table { } @Override + @Deprecated public int getRpcTimeout() { - return rpcTimeout; + return readRpcTimeout; } @Override + @Deprecated public void setRpcTimeout(int rpcTimeout) { - this.rpcTimeout = rpcTimeout; + this.readRpcTimeout = rpcTimeout; + this.writeRpcTimeout = rpcTimeout; + } + + @Override + public int getWriteRpcTimeout() { + return writeRpcTimeout; + } + + @Override + public void setWriteRpcTimeout(int writeRpcTimeout) { + this.writeRpcTimeout = writeRpcTimeout; + } + + @Override + public int getReadRpcTimeout() { return readRpcTimeout; } + + @Override + public void setReadRpcTimeout(int readRpcTimeout) { + this.readRpcTimeout = readRpcTimeout; } @Override @@ -1282,7 +1308,7 @@ public class HTable implements Table { AsyncProcess asyncProcess = new AsyncProcess(connection, configuration, pool, RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()), - true, RpcControllerFactory.instantiate(configuration)); + true, RpcControllerFactory.instantiate(configuration), readRpcTimeout); AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs, new Callback<ClientProtos.CoprocessorServiceResult>() { http://git-wip-us.apache.org/repos/asf/hbase/blob/30d7eeae/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java index f1bbcb3..ba963c2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java @@ -442,6 +442,7 @@ public class HTableMultiplexer { private final ScheduledExecutorService executor; private final int maxRetryInQueue; private final AtomicInteger retryInQueue = new AtomicInteger(0); + private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr, HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize, @@ -451,7 +452,10 @@ public class HTableMultiplexer { this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize); RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf); RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf); - this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory); + this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, + conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory, writeRpcTimeout); this.executor = executor; this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000); } http://git-wip-us.apache.org/repos/asf/hbase/blob/30d7eeae/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java index f2cec97..4d93442 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java @@ -585,17 +585,56 @@ public interface Table extends Closeable { int getOperationTimeout(); /** + * Get timeout (millisecond) of each rpc request in this Table instance. + * + * @returns Currently configured read timeout + * @deprecated Use getReadRpcTimeout or getWriteRpcTimeout instead + */ + @Deprecated + int getRpcTimeout(); + + /** * Set timeout (millisecond) of each rpc request in operations of this Table instance, will * override the value of hbase.rpc.timeout in configuration. * If a rpc request waiting too long, it will stop waiting and send a new request to retry until * retries exhausted or operation timeout reached. + * <p> + * NOTE: This will set both the read and write timeout settings to the provided value. + * * @param rpcTimeout the timeout of each rpc request in millisecond. + * + * @deprecated Use setReadRpcTimeout or setWriteRpcTimeout instead */ + @Deprecated void setRpcTimeout(int rpcTimeout); /** - * Get timeout (millisecond) of each rpc request in this Table instance. + * Get timeout (millisecond) of each rpc read request in this Table instance. */ - int getRpcTimeout(); + int getReadRpcTimeout(); + + /** + * Set timeout (millisecond) of each rpc read request in operations of this Table instance, will + * override the value of hbase.rpc.read.timeout in configuration. + * If a rpc read request waiting too long, it will stop waiting and send a new request to retry + * until retries exhausted or operation timeout reached. + * + * @param readRpcTimeout + */ + void setReadRpcTimeout(int readRpcTimeout); + /** + * Get timeout (millisecond) of each rpc write request in this Table instance. + */ + int getWriteRpcTimeout(); + + /** + * Set timeout (millisecond) of each rpc write request in operations of this Table instance, will + * override the value of hbase.rpc.write.timeout in configuration. + * If a rpc write request waiting too long, it will stop waiting and send a new request to retry + * until retries exhausted or operation timeout reached. + * + * @param writeRpcTimeout + */ + void setWriteRpcTimeout(int writeRpcTimeout); } http://git-wip-us.apache.org/repos/asf/hbase/blob/30d7eeae/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java index d943316..0aa9704 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java @@ -138,6 +138,7 @@ public class TestAsyncProcess { final AtomicInteger nbActions = new AtomicInteger(); public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>(); public AtomicInteger callsCt = new AtomicInteger(); + private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); @Override protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName, @@ -157,14 +158,14 @@ public class TestAsyncProcess { public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) { super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)), - new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf)); + new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf), rpcTimeout); } public MyAsyncProcess( ClusterConnection hc, Configuration conf, boolean useGlobalErrors) { super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())), - new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf)); + new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), rpcTimeout); } public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors, @@ -176,7 +177,7 @@ public class TestAsyncProcess { throw new RejectedExecutionException("test under failure"); } }, - new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf)); + new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), rpcTimeout); } @Override @@ -1111,10 +1112,12 @@ public class TestAsyncProcess { } static class AsyncProcessForThrowableCheck extends AsyncProcess { + private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf, ExecutorService pool) { super(hc, conf, pool, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory( - conf)); + conf), rpcTimeout); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/30d7eeae/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index 256c374..ce18ef5 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -815,10 +815,23 @@ public final class HConstants { /** * timeout for each RPC + * @deprecated Use {@link #HBASE_RPC_READ_TIMEOUT_KEY} or {@link #HBASE_RPC_WRITE_TIMEOUT_KEY} + * instead. */ + @Deprecated public static final String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout"; /** + * timeout for each read RPC + */ + public static final String HBASE_RPC_READ_TIMEOUT_KEY = "hbase.rpc.read.timeout"; + + /** + * timeout for each write RPC + */ + public static final String HBASE_RPC_WRITE_TIMEOUT_KEY = "hbase.rpc.write.timeout"; + + /** * Default value of {@link #HBASE_RPC_TIMEOUT_KEY} */ public static final int DEFAULT_HBASE_RPC_TIMEOUT = 60000; http://git-wip-us.apache.org/repos/asf/hbase/blob/30d7eeae/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java ---------------------------------------------------------------------- diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java index b9e393e..33c2fc2 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java @@ -860,12 +860,34 @@ public class RemoteHTable implements Table { } @Override + @Deprecated public void setRpcTimeout(int rpcTimeout) { throw new UnsupportedOperationException(); } @Override + @Deprecated public int getRpcTimeout() { throw new UnsupportedOperationException(); } + + @Override + public int getReadRpcTimeout() { + throw new UnsupportedOperationException(); + } + + @Override + public void setReadRpcTimeout(int readRpcTimeout) { + throw new UnsupportedOperationException(); + } + + @Override + public int getWriteRpcTimeout() { + throw new UnsupportedOperationException(); + } + + @Override + public void setWriteRpcTimeout(int writeRpcTimeout) { + throw new UnsupportedOperationException(); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/30d7eeae/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java index 5da0df7..6a73261 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java @@ -308,12 +308,26 @@ public final class HTableWrapper implements Table { } @Override + @Deprecated public void setRpcTimeout(int rpcTimeout) { table.setRpcTimeout(rpcTimeout); } @Override + public void setWriteRpcTimeout(int writeRpcTimeout) { table.setWriteRpcTimeout(writeRpcTimeout); } + + @Override + public void setReadRpcTimeout(int readRpcTimeout) { table.setReadRpcTimeout(readRpcTimeout); } + + @Override + @Deprecated public int getRpcTimeout() { return table.getRpcTimeout(); } + + @Override + public int getWriteRpcTimeout() { return table.getWriteRpcTimeout(); } + + @Override + public int getReadRpcTimeout() { return table.getReadRpcTimeout(); } } http://git-wip-us.apache.org/repos/asf/hbase/blob/30d7eeae/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java index 265e3c1..036b38f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java @@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; @@ -126,7 +127,8 @@ public class HConnectionTestingUtility { Mockito.when(c.getNonceGenerator()).thenReturn(ng); Mockito.when(c.getAsyncProcess()).thenReturn( new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false, - RpcControllerFactory.instantiate(conf))); + RpcControllerFactory.instantiate(conf), conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT))); Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn( RpcRetryingCallerFactory.instantiate(conf, RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null)); @@ -194,4 +196,4 @@ public class HConnectionTestingUtility { return result; } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hbase/blob/30d7eeae/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 1b20b76..4d47bde 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -18,11 +18,7 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import java.io.IOException; import java.lang.reflect.Field; @@ -149,6 +145,16 @@ public class TestHCM { } } + public static class SleepWriteCoprocessor extends BaseRegionObserver { + public static final int SLEEP_TIME = 5000; + @Override + public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> e, + final Increment increment) throws IOException { + Threads.sleep(SLEEP_TIME); + return super.preIncrement(e, increment); + } + } + @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true); @@ -351,7 +357,7 @@ public class TestHCM { } } - @Test(expected = RetriesExhaustedException.class) + @Test public void testRpcTimeout() throws Exception { HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcTimeout"); hdt.addCoprocessor(SleepCoprocessor.class.getName()); @@ -361,6 +367,78 @@ public class TestHCM { t.setRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2); t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100); t.get(new Get(FAM_NAM)); + fail("Get should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected + } + + // Again, with configuration based override + c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SleepCoprocessor.SLEEP_TIME / 2); + try (Connection conn = ConnectionFactory.createConnection(c)) { + try (Table t = conn.getTable(hdt.getTableName())) { + t.get(new Get(FAM_NAM)); + fail("Get should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected + } + } + } + + @Test + public void testWriteRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testWriteRpcTimeout"); + hdt.addCoprocessor(SleepWriteCoprocessor.class.getName()); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + + try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) { + t.setWriteRpcTimeout(SleepWriteCoprocessor.SLEEP_TIME / 2); + t.setOperationTimeout(SleepWriteCoprocessor.SLEEP_TIME * 100); + Increment i = new Increment(FAM_NAM); + i.addColumn(FAM_NAM, FAM_NAM, 1); + t.increment(i); + fail("Write should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected + } + + // Again, with configuration based override + c.setInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, SleepWriteCoprocessor.SLEEP_TIME / 2); + try (Connection conn = ConnectionFactory.createConnection(c)) { + try (Table t = conn.getTable(hdt.getTableName())) { + Increment i = new Increment(FAM_NAM); + i.addColumn(FAM_NAM, FAM_NAM, 1); + t.increment(i); + fail("Write should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected + } + } + } + + @Test + public void testReadRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testReadRpcTimeout"); + hdt.addCoprocessor(SleepCoprocessor.class.getName()); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + + try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) { + t.setReadRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2); + t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100); + t.get(new Get(FAM_NAM)); + fail("Get should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected + } + + // Again, with configuration based override + c.setInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, SleepCoprocessor.SLEEP_TIME / 2); + try (Connection conn = ConnectionFactory.createConnection(c)) { + try (Table t = conn.getTable(hdt.getTableName())) { + t.get(new Get(FAM_NAM)); + fail("Get should not have succeeded"); + } catch (RetriesExhaustedException e) { + // expected + } } } http://git-wip-us.apache.org/repos/asf/hbase/blob/30d7eeae/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java index 770c39b..d2e78b7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java @@ -333,12 +333,26 @@ public class RegionAsTable implements Table { } @Override + @Deprecated public void setRpcTimeout(int rpcTimeout) { throw new UnsupportedOperationException(); } @Override + public void setWriteRpcTimeout(int writeRpcTimeout) {throw new UnsupportedOperationException(); } + + @Override + public void setReadRpcTimeout(int readRpcTimeout) {throw new UnsupportedOperationException(); } + + @Override + @Deprecated public int getRpcTimeout() { throw new UnsupportedOperationException(); } + + @Override + public int getWriteRpcTimeout() { throw new UnsupportedOperationException(); } + + @Override + public int getReadRpcTimeout() { throw new UnsupportedOperationException(); } } \ No newline at end of file