Repository: hbase Updated Branches: refs/heads/0.98 ee344b27f -> 342569620
HBASE-16352 Port HBASE-15645 (hbase.rpc.timeout is not used in operations of HTable) to 0.98 Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/34256962 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/34256962 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/34256962 Branch: refs/heads/0.98 Commit: 3425696209b7c17aff4508985e0e316a9da57c5a Parents: ee344b2 Author: Andrew Purtell <[email protected]> Authored: Wed Aug 3 18:17:24 2016 -0700 Committer: Andrew Purtell <[email protected]> Committed: Thu Aug 4 10:23:56 2016 -0700 ---------------------------------------------------------------------- .../apache/hadoop/hbase/client/HBaseAdmin.java | 48 +++++++++++++++++- .../org/apache/hadoop/hbase/client/HTable.java | 53 ++++++++++++++------ .../hadoop/hbase/client/RpcRetryingCaller.java | 23 ++++++--- .../hbase/client/RpcRetryingCallerFactory.java | 20 +++++++- .../org/apache/hadoop/hbase/HConstants.java | 4 +- .../src/main/resources/hbase-default.xml | 11 +++- .../org/apache/hadoop/hbase/client/TestHCM.java | 29 +++++++++++ 7 files changed, 159 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/34256962/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index e3fcc6e..745da57 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -67,7 +67,9 @@ import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.MergeRegionException; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RegionServerCoprocessorRpcChannel; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService; @@ -175,8 +177,11 @@ public class HBaseAdmin implements Abortable, Closeable { private boolean aborted; private boolean cleanupConnectionOnClose = false; // close the connection in close() private boolean closed = false; + private int operationTimeout; + private int rpcTimeout; private RpcRetryingCallerFactory rpcCallerFactory; + private RpcControllerFactory rpcControllerFactory; /** * Constructor. @@ -211,6 +216,11 @@ public class HBaseAdmin implements Abortable, Closeable { HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER); this.retryLongerMultiplier = this.conf.getInt( "hbase.client.retries.longer.multiplier", 10); + this.operationTimeout = this.conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, + HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + this.rpcTimeout = this.conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); + this.rpcControllerFactory = RpcControllerFactory.instantiate(this.conf); this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf, connection.getStatisticsTracker()); } @@ -420,7 +430,35 @@ public class HBaseAdmin implements Abortable, Closeable { */ public HTableDescriptor getTableDescriptor(final TableName tableName) throws TableNotFoundException, IOException { - return this.connection.getHTableDescriptor(tableName); + return getTableDescriptor(tableName, getConnection(), rpcCallerFactory, rpcControllerFactory, + operationTimeout, rpcTimeout); + } + + static HTableDescriptor getTableDescriptor(final TableName tableName, HConnection connection, + RpcRetryingCallerFactory rpcCallerFactory, final RpcControllerFactory rpcControllerFactory, + int operationTimeout, int rpcTimeout) throws TableNotFoundException, IOException { + + if (tableName == null) return null; + HTableDescriptor htd = executeCallable(new MasterCallable<HTableDescriptor>(connection) { + @Override + public HTableDescriptor call() throws ServiceException { + PayloadCarryingRpcController controller = rpcControllerFactory.newController(); + controller.setPriority(tableName); + GetTableDescriptorsResponse htds; + GetTableDescriptorsRequest req = + RequestConverter.buildGetTableDescriptorsRequest(tableName); + htds = master.getTableDescriptors(controller, req); + + if (!htds.getTableSchemaList().isEmpty()) { + return HTableDescriptor.convert(htds.getTableSchemaList().get(0)); + } + return null; + } + }, rpcCallerFactory, operationTimeout, rpcTimeout); + if (htd != null) { + return htd; + } + throw new TableNotFoundException(tableName.getNameAsString()); } public HTableDescriptor getTableDescriptor(final byte[] tableName) @@ -3474,7 +3512,13 @@ public class HBaseAdmin implements Abortable, Closeable { } private <V> V executeCallable(MasterCallable<V> callable) throws IOException { - RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller(); + return executeCallable(callable, rpcCallerFactory, operationTimeout, rpcTimeout); + } + + private static <V> V executeCallable(MasterCallable<V> callable, + RpcRetryingCallerFactory rpcCallerFactory, int operationTimeout, int rpcTimeout) + throws IOException { + RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller(rpcTimeout); try { return caller.callWithRetries(callable); } finally { http://git-wip-us.apache.org/repos/asf/hbase/blob/34256962/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java index 805cecd..ada451c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java @@ -138,7 +138,8 @@ public class HTable implements HTableInterface { protected long scannerMaxResultSize; private ExecutorService pool; // For Multi private boolean closed; - private int operationTimeout; + private int operationTimeout; // global timeout for each blocking method with retrying rpc + private int rpcTimeout; // timeout for each rpc request private final boolean cleanupPoolOnClose; // shutdown the pool in close() private final boolean cleanupConnectionOnClose; // close the connection in close() @@ -380,7 +381,8 @@ public class HTable implements HTableInterface { this.currentWriteBufferSize = 0; this.scannerCaching = tableConfiguration.getScannerCaching(); this.scannerMaxResultSize = tableConfiguration.getScannerMaxResultSize(); - + this.rpcTimeout = configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, + HConstants.DEFAULT_HBASE_RPC_TIMEOUT); if (this.rpcCallerFactory == null) { this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(configuration, this.connection.getStatisticsTracker()); @@ -592,8 +594,12 @@ public class HTable implements HTableInterface { */ @Override public HTableDescriptor getTableDescriptor() throws IOException { - return new UnmodifyableHTableDescriptor( - this.connection.getHTableDescriptor(this.tableName)); + HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory, + rpcControllerFactory, operationTimeout, rpcTimeout); + if (htd != null) { + return new UnmodifyableHTableDescriptor(htd); + } + return null; } /** @@ -748,8 +754,9 @@ public class HTable implements HTableInterface { .getRegionName(), row, family, rpcControllerFactory.newController()); } }; - return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout); - } + return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); + } /** * {@inheritDoc} @@ -832,7 +839,8 @@ public class HTable implements HTableInterface { getReq, controller); } }; - return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout); + return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** @@ -927,7 +935,8 @@ public class HTable implements HTableInterface { } } }; - rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout); + rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** @@ -1067,7 +1076,11 @@ public class HTable implements HTableInterface { if (remainingTime == 0) { throw new DoNotRetryIOException("Timeout for mutate row"); } - RpcClient.setRpcTimeout(remainingTime); + int timeout = remainingTime; + if (rpcTimeout > 0 && rpcTimeout < timeout) { + timeout = rpcTimeout; + } + RpcClient.setRpcTimeout(timeout); try { RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction( getLocation().getRegionInfo().getRegionName(), rm); @@ -1125,7 +1138,8 @@ public class HTable implements HTableInterface { } } }; - return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout); + return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** @@ -1154,7 +1168,8 @@ public class HTable implements HTableInterface { } } }; - return rpcCallerFactory.<Result> newCaller().callWithRetries(callable, this.operationTimeout); + return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** @@ -1219,7 +1234,8 @@ public class HTable implements HTableInterface { } } }; - return rpcCallerFactory.<Long> newCaller().callWithRetries(callable, this.operationTimeout); + return rpcCallerFactory.<Long> newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** @@ -1246,10 +1262,10 @@ public class HTable implements HTableInterface { } } }; - return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout); + return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } - /** * {@inheritDoc} */ @@ -1274,7 +1290,8 @@ public class HTable implements HTableInterface { } } }; - return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable, this.operationTimeout); + return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable, + this.operationTimeout); } /** @@ -1296,7 +1313,11 @@ public class HTable implements HTableInterface { if (remainingTime == 0) { throw new DoNotRetryIOException("Timeout for mutate row"); } - RpcClient.setRpcTimeout(remainingTime); + int timeout = remainingTime; + if (rpcTimeout > 0 && rpcTimeout < timeout){ + timeout = rpcTimeout; + } + RpcClient.setRpcTimeout(timeout); try { RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction( getLocation().getRegionInfo().getRegionName(), rm); http://git-wip-us.apache.org/repos/asf/hbase/blob/34256962/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java index 8b1713f..086b031 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java @@ -54,20 +54,25 @@ public class RpcRetryingCaller<T> { * When we started making calls. */ private long globalStartTime; - /** - * Start and end times for a single call. - */ + private final static int MIN_RPC_TIMEOUT = 2000; + /** How many retries are allowed before we start to log */ private final int startLogErrorsCnt; private final long pause; private final int retries; + private final int rpcTimeout;// timeout for each rpc request public RpcRetryingCaller(long pause, int retries, int startLogErrorsCnt) { + this(pause, retries, startLogErrorsCnt, 0); + } + + public RpcRetryingCaller(long pause, int retries, int startLogErrorsCnt, int rpcTimeout) { this.pause = pause; this.retries = retries; this.startLogErrorsCnt = startLogErrorsCnt; + this.rpcTimeout = rpcTimeout; } private void beforeCall() { @@ -76,10 +81,15 @@ public class RpcRetryingCaller<T> { if (remaining < MIN_RPC_TIMEOUT) { // If there is no time left, we're trying anyway. It's too late. // 0 means no timeout, and it's not the intent here. So we secure both cases by - // resetting to the minimum. + // setting remaining to MIN_RPC_TIMEOUT remaining = MIN_RPC_TIMEOUT; } - RpcClient.setRpcTimeout(remaining); + int timeout = remaining; + // If we have a nonzero setting for RPC timeout, use it + if (rpcTimeout > 0 && rpcTimeout < timeout){ + timeout = rpcTimeout; + } + RpcClient.setRpcTimeout(timeout); } private void afterCall() { @@ -158,8 +168,9 @@ public class RpcRetryingCaller<T> { * @return Calculate how long a single call took */ private long singleCallDuration(final long expectedSleep) { + int timeout = rpcTimeout > 0 ? rpcTimeout : MIN_RPC_TIMEOUT; return (EnvironmentEdgeManager.currentTimeMillis() - this.globalStartTime) - + MIN_RPC_TIMEOUT + expectedSleep; + + timeout + expectedSleep; } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/34256962/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java index 1f83b0d..70c87c9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java @@ -33,6 +33,7 @@ public class RpcRetryingCallerFactory { protected final Configuration conf; private final long pause; private final int retries; + private final int rpcTimeout; private final int startLogErrorsCnt; private final boolean enableBackPressure; private ServerStatisticTracker stats; @@ -51,7 +52,8 @@ public class RpcRetryingCallerFactory { startLogErrorsCnt = conf.getInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, AsyncProcess.DEFAULT_START_LOG_ERRORS_AFTER_COUNT); enableBackPressure = conf.getBoolean(HConstants.ENABLE_CLIENT_BACKPRESSURE, - HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); + HConstants.DEFAULT_ENABLE_CLIENT_BACKPRESSURE); + rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,HConstants.DEFAULT_HBASE_RPC_TIMEOUT); } /** @@ -61,11 +63,25 @@ public class RpcRetryingCallerFactory { this.stats = statisticTracker; } + /** + * Create a new RetryingCaller with specific rpc timeout. + */ + public <T> RpcRetryingCaller<T> newCaller(int rpcTimeout) { + // We store the values in the factory instance. This way, constructing new objects + // is cheap as it does not require parsing a complex structure. + RpcRetryingCaller<T> caller = new RpcRetryingCaller<T>(pause, retries, + startLogErrorsCnt, rpcTimeout); + return caller; + } + + /** + * Create a new RetryingCaller with configured rpc timeout. + */ public <T> RpcRetryingCaller<T> newCaller() { // We store the values in the factory instance. This way, constructing new objects // is cheap as it does not require parsing a complex structure. RpcRetryingCaller<T> caller = new RpcRetryingCaller<T>(pause, retries, - startLogErrorsCnt); + startLogErrorsCnt, rpcTimeout); return caller; } http://git-wip-us.apache.org/repos/asf/hbase/blob/34256962/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index dfb9abc..e5dc50d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -272,10 +272,10 @@ public final class HConstants { /** Parameter name for HBase client IPC pool size */ public static final String HBASE_CLIENT_IPC_POOL_SIZE = "hbase.client.ipc.pool.size"; - /** Parameter name for HBase client operation timeout, which overrides RPC timeout */ + /** Parameter name for HBase client operation timeout. */ public static final String HBASE_CLIENT_OPERATION_TIMEOUT = "hbase.client.operation.timeout"; - /** Parameter name for HBase client operation timeout, which overrides RPC timeout */ + /** Parameter name for HBase client operation timeout. */ public static final String HBASE_CLIENT_META_OPERATION_TIMEOUT = "hbase.client.meta.operation.timeout"; http://git-wip-us.apache.org/repos/asf/hbase/blob/34256962/hbase-common/src/main/resources/hbase-default.xml ---------------------------------------------------------------------- diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index 4761df4..38b6939 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -770,11 +770,20 @@ possible configurations would overwhelm and obscure the important. <property> <name>hbase.rpc.timeout</name> <value>60000</value> - <description>This is for the RPC layer to define how long HBase client applications + <description>This is for the RPC layer to define how long (millisecond) HBase client applications take for a remote call to time out. It uses pings to check connections but will eventually throw a TimeoutException.</description> </property> <property> + <name>hbase.client.operation.timeout</name> + <value>1200000</value> + <description>Operation timeout is a top-level restriction (millisecond) that makes sure a + blocking operation in Table will not be blocked more than this. In each operation, if rpc + request fails because of timeout or other reason, it will retry until success or throw + RetriesExhaustedException. But if the total time being blocking reach the operation timeout + before retries exhausted, it will break early and throw SocketTimeoutException.</description> + </property> + <property> <name>hbase.rpc.shortoperation.timeout</name> <value>10000</value> <description>This is another version of "hbase.rpc.timeout". For those RPC operation http://git-wip-us.apache.org/repos/asf/hbase/blob/34256962/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java index 1f1c9e4..5cffb12 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java @@ -43,16 +43,21 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.HConnectionManager.HConnectionImplementation; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.filter.Filter; @@ -98,9 +103,19 @@ public class TestHCM { private static final byte[] ROW_X = Bytes.toBytes("xxx"); private static Random _randy = new Random(); + public static class SleepCoprocessor extends BaseRegionObserver { + public static final int SLEEP_TIME = 5000; + @Override + public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e, + final Get get, final List<Cell> results) throws IOException { + Threads.sleep(SLEEP_TIME); + } + } + @BeforeClass public static void setUpBeforeClass() throws Exception { TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); TEST_UTIL.startMiniCluster(2); } @@ -229,6 +244,20 @@ public class TestHCM { hci.getClient(sn); // will throw an exception: RegionServerStoppedException } + @Test(expected = RetriesExhaustedException.class) + public void testRpcTimeout() throws Exception { + HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcTimeout"); + hdt.addCoprocessor(SleepCoprocessor.class.getName()); + Configuration c = new Configuration(TEST_UTIL.getConfiguration()); + c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SleepCoprocessor.SLEEP_TIME / 2); + HTable table = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c); + try { + table.get(new Get(FAM_NAM)); + } finally { + table.close(); + } + } + /** * Test that the connection to the dead server is cut immediately when we receive the * notification.
