This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new f9f6354 HBASE-22322 Use special pause for CallQueueTooBigException
f9f6354 is described below
commit f9f63543933616ca887fa2dc954dfd7e649d0461
Author: Duo Zhang <[email protected]>
AuthorDate: Mon Apr 29 10:20:33 2019 +0800
HBASE-22322 Use special pause for CallQueueTooBigException
---
.../hadoop/hbase/client/AsyncAdminBuilder.java | 22 ++-
.../hadoop/hbase/client/AsyncAdminBuilderBase.java | 9 +
.../client/AsyncAdminRequestRetryingCaller.java | 8 +-
.../hbase/client/AsyncBatchRpcRetryingCaller.java | 24 ++-
.../hadoop/hbase/client/AsyncClientScanner.java | 29 +--
.../hbase/client/AsyncConnectionConfiguration.java | 23 ++-
.../AsyncMasterRequestRpcRetryingCaller.java | 8 +-
.../hbase/client/AsyncRpcRetryingCaller.java | 13 +-
.../client/AsyncRpcRetryingCallerFactory.java | 50 ++++-
.../AsyncScanSingleRegionRpcRetryingCaller.java | 12 +-
.../AsyncServerRequestRpcRetryingCaller.java | 8 +-
.../AsyncSingleRequestRpcRetryingCaller.java | 8 +-
.../hadoop/hbase/client/AsyncTableBuilder.java | 13 ++
.../hadoop/hbase/client/AsyncTableBuilderBase.java | 9 +
.../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 36 ++--
.../hadoop/hbase/client/RawAsyncTableImpl.java | 26 ++-
.../TestAsyncClientPauseForCallQueueTooBig.java | 204 +++++++++++++++++++++
17 files changed, 422 insertions(+), 80 deletions(-)
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java
index 6a8db9e..49bc350 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilder.java
@@ -36,16 +36,12 @@ public interface AsyncAdminBuilder {
* Set timeout for a whole admin operation. Operation timeout and max
attempt times(or max retry
* times) are both limitations for retrying, we will stop retrying when we
reach any of the
* limitations.
- * @param timeout
- * @param unit
* @return this for invocation chaining
*/
AsyncAdminBuilder setOperationTimeout(long timeout, TimeUnit unit);
/**
* Set timeout for each rpc request.
- * @param timeout
- * @param unit
* @return this for invocation chaining
*/
AsyncAdminBuilder setRpcTimeout(long timeout, TimeUnit unit);
@@ -53,17 +49,27 @@ public interface AsyncAdminBuilder {
/**
* Set the base pause time for retrying. We use an exponential policy to
generate sleep time when
* retrying.
- * @param timeout
- * @param unit
* @return this for invocation chaining
+ * @see #setRetryPauseForCQTBE(long, TimeUnit)
*/
AsyncAdminBuilder setRetryPause(long timeout, TimeUnit unit);
/**
+ * Set the base pause time for retrying when we hit {@code
CallQueueTooBigException}. We use an
+ * exponential policy to generate sleep time when retrying.
+ * <p/>
+ * This value should be greater than the normal pause value which could be
set with the above
+ * {@link #setRetryPause(long, TimeUnit)} method, as usually {@code
CallQueueTooBigException}
+ * means the server is overloaded. We just use the normal pause value for
+ * {@code CallQueueTooBigException} if here you specify a smaller value.
+ * @see #setRetryPause(long, TimeUnit)
+ */
+ AsyncAdminBuilder setRetryPauseForCQTBE(long pause, TimeUnit unit);
+
+ /**
* Set the max retry times for an admin operation. Usually it is the max
attempt times minus 1.
* Operation timeout and max attempt times(or max retry times) are both
limitations for retrying,
* we will stop retrying when we reach any of the limitations.
- * @param maxRetries
* @return this for invocation chaining
*/
default AsyncAdminBuilder setMaxRetries(int maxRetries) {
@@ -74,14 +80,12 @@ public interface AsyncAdminBuilder {
* Set the max attempt times for an admin operation. Usually it is the max
retry times plus 1.
* Operation timeout and max attempt times(or max retry times) are both
limitations for retrying,
* we will stop retrying when we reach any of the limitations.
- * @param maxAttempts
* @return this for invocation chaining
*/
AsyncAdminBuilder setMaxAttempts(int maxAttempts);
/**
* Set the number of retries that are allowed before we start to log.
- * @param startLogErrorsCnt
* @return this for invocation chaining
*/
AsyncAdminBuilder setStartLogErrorsCnt(int startLogErrorsCnt);
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java
index 00896ef..ffb3ae9 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminBuilderBase.java
@@ -33,6 +33,8 @@ abstract class AsyncAdminBuilderBase implements
AsyncAdminBuilder {
protected long pauseNs;
+ protected long pauseForCQTBENs;
+
protected int maxAttempts;
protected int startLogErrorsCnt;
@@ -41,6 +43,7 @@ abstract class AsyncAdminBuilderBase implements
AsyncAdminBuilder {
this.rpcTimeoutNs = connConf.getRpcTimeoutNs();
this.operationTimeoutNs = connConf.getOperationTimeoutNs();
this.pauseNs = connConf.getPauseNs();
+ this.pauseForCQTBENs = connConf.getPauseForCQTBENs();
this.maxAttempts = connConf.getMaxRetries();
this.startLogErrorsCnt = connConf.getStartLogErrorsCnt();
}
@@ -64,6 +67,12 @@ abstract class AsyncAdminBuilderBase implements
AsyncAdminBuilder {
}
@Override
+ public AsyncAdminBuilder setRetryPauseForCQTBE(long timeout, TimeUnit unit) {
+ this.pauseForCQTBENs = unit.toNanos(timeout);
+ return this;
+ }
+
+ @Override
public AsyncAdminBuilder setMaxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
return this;
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
index ce0fca7..7a381db 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
@@ -44,10 +44,10 @@ public class AsyncAdminRequestRetryingCaller<T> extends
AsyncRpcRetryingCaller<T
private ServerName serverName;
public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl
conn, int priority,
- long pauseNs, int maxAttempts, long operationTimeoutNs, long
rpcTimeoutNs,
- int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
- super(retryTimer, conn, priority, pauseNs, maxAttempts,
operationTimeoutNs, rpcTimeoutNs,
- startLogErrorsCnt);
+ long pauseNs, long pauseForCQTBENs, int maxAttempts, long
operationTimeoutNs,
+ long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName,
Callable<T> callable) {
+ super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxAttempts,
operationTimeoutNs,
+ rpcTimeoutNs, startLogErrorsCnt);
this.serverName = serverName;
this.callable = callable;
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index e429422..464eff5 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -45,6 +45,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.mutable.MutableBoolean;
+import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
@@ -103,6 +104,8 @@ class AsyncBatchRpcRetryingCaller<T> {
private final long pauseNs;
+ private final long pauseForCQTBENs;
+
private final int maxAttempts;
private final long operationTimeoutNs;
@@ -147,17 +150,17 @@ class AsyncBatchRpcRetryingCaller<T> {
}
public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl
conn,
- TableName tableName, List<? extends Row> actions, long pauseNs, int
maxAttempts,
- long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
+ TableName tableName, List<? extends Row> actions, long pauseNs, long
pauseForCQTBENs,
+ int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int
startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
this.tableName = tableName;
this.pauseNs = pauseNs;
+ this.pauseForCQTBENs = pauseForCQTBENs;
this.maxAttempts = maxAttempts;
this.operationTimeoutNs = operationTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
this.startLogErrorsCnt = startLogErrorsCnt;
-
this.actions = new ArrayList<>(actions.size());
this.futures = new ArrayList<>(actions.size());
this.action2Future = new IdentityHashMap<>(actions.size());
@@ -337,7 +340,7 @@ class AsyncBatchRpcRetryingCaller<T> {
}
});
if (!failedActions.isEmpty()) {
- tryResubmit(failedActions.stream(), tries,
retryImmediately.booleanValue());
+ tryResubmit(failedActions.stream(), tries,
retryImmediately.booleanValue(), false);
}
}
@@ -442,24 +445,27 @@ class AsyncBatchRpcRetryingCaller<T> {
List<Action> copiedActions = actionsByRegion.values().stream().flatMap(r
-> r.actions.stream())
.collect(Collectors.toList());
addError(copiedActions, error, serverName);
- tryResubmit(copiedActions.stream(), tries, error instanceof
RetryImmediatelyException);
+ tryResubmit(copiedActions.stream(), tries, error instanceof
RetryImmediatelyException,
+ error instanceof CallQueueTooBigException);
}
- private void tryResubmit(Stream<Action> actions, int tries, boolean
immediately) {
+ private void tryResubmit(Stream<Action> actions, int tries, boolean
immediately,
+ boolean isCallQueueTooBig) {
if (immediately) {
groupAndSend(actions, tries);
return;
}
long delayNs;
+ long pauseNsToUse = isCallQueueTooBig ? pauseForCQTBENs : pauseNs;
if (operationTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
if (maxDelayNs <= 0) {
failAll(actions, tries);
return;
}
- delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
+ delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
} else {
- delayNs = getPauseTime(pauseNs, tries - 1);
+ delayNs = getPauseTime(pauseNsToUse, tries - 1);
}
retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs,
TimeUnit.NANOSECONDS);
}
@@ -498,7 +504,7 @@ class AsyncBatchRpcRetryingCaller<T> {
sendOrDelay(actionsByServer, tries);
}
if (!locateFailed.isEmpty()) {
- tryResubmit(locateFailed.stream(), tries, false);
+ tryResubmit(locateFailed.stream(), tries, false, false);
}
});
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
index d6cca48..5fd00a5 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
@@ -73,6 +73,8 @@ class AsyncClientScanner {
private final long pauseNs;
+ private final long pauseForCQTBENs;
+
private final int maxAttempts;
private final long scanTimeoutNs;
@@ -84,8 +86,8 @@ class AsyncClientScanner {
private final ScanResultCache resultCache;
public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer,
TableName tableName,
- AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, int
maxAttempts, long scanTimeoutNs,
- long rpcTimeoutNs, int startLogErrorsCnt) {
+ AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, long
pauseForCQTBENs,
+ int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int
startLogErrorsCnt) {
if (scan.getStartRow() == null) {
scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
}
@@ -98,6 +100,7 @@ class AsyncClientScanner {
this.conn = conn;
this.retryTimer = retryTimer;
this.pauseNs = pauseNs;
+ this.pauseForCQTBENs = pauseForCQTBENs;
this.maxAttempts = maxAttempts;
this.scanTimeoutNs = scanTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
@@ -160,14 +163,16 @@ class AsyncClientScanner {
}
private void startScan(OpenScannerResponse resp) {
-
addListener(conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId())
- .location(resp.loc).remote(resp.isRegionServerRemote)
- .scannerLeaseTimeoutPeriod(resp.resp.getTtl(),
TimeUnit.MILLISECONDS).stub(resp.stub)
-
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
- .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
- .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs,
TimeUnit.NANOSECONDS)
- .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
- .start(resp.controller, resp.resp), (hasMore, error) -> {
+ addListener(
+
conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
+ .remote(resp.isRegionServerRemote)
+ .scannerLeaseTimeoutPeriod(resp.resp.getTtl(),
TimeUnit.MILLISECONDS).stub(resp.stub)
+
.setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
+ .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+ .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs,
TimeUnit.NANOSECONDS)
+ .pauseForCQTBE(pauseForCQTBENs,
TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
+ .startLogErrorsCnt(startLogErrorsCnt).start(resp.controller,
resp.resp),
+ (hasMore, error) -> {
if (error != null) {
consumer.onError(error);
return;
@@ -185,8 +190,8 @@ class AsyncClientScanner {
.row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs,
TimeUnit.NANOSECONDS)
-
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner)
- .call();
+ .pauseForCQTBE(pauseForCQTBENs,
TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
+
.startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner).call();
}
private long getPrimaryTimeoutNs() {
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
index 22042c9..6596578 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -30,6 +30,7 @@ import static
org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_
import static
org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT;
import static
org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_CACHING;
import static
org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY;
@@ -54,6 +55,8 @@ import static
org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFE
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Timeout configs.
@@ -61,6 +64,8 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
class AsyncConnectionConfiguration {
+ private static final Logger LOG =
LoggerFactory.getLogger(AsyncConnectionConfiguration.class);
+
private final long metaOperationTimeoutNs;
// timeout for a whole operation such as get, put or delete. Notice that
scan will not be effected
@@ -79,6 +84,8 @@ class AsyncConnectionConfiguration {
private final long pauseNs;
+ private final long pauseForCQTBENs;
+
private final int maxRetries;
/** How many retries are allowed before we start to log */
@@ -121,8 +128,16 @@ class AsyncConnectionConfiguration {
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY,
rpcTimeoutNs));
this.writeRpcTimeoutNs =
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY,
rpcTimeoutNs));
- this.pauseNs =
- TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE,
DEFAULT_HBASE_CLIENT_PAUSE));
+ long pauseMs = conf.getLong(HBASE_CLIENT_PAUSE,
DEFAULT_HBASE_CLIENT_PAUSE);
+ long pauseForCQTBEMs = conf.getLong(HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseMs);
+ if (pauseForCQTBEMs < pauseMs) {
+ LOG.warn(
+ "The {} setting: {} ms is less than the {} setting: {} ms, use the
greater one instead",
+ HBASE_CLIENT_PAUSE_FOR_CQTBE, pauseForCQTBEMs, HBASE_CLIENT_PAUSE,
pauseMs);
+ pauseForCQTBEMs = pauseMs;
+ }
+ this.pauseNs = TimeUnit.MILLISECONDS.toNanos(pauseMs);
+ this.pauseForCQTBENs = TimeUnit.MILLISECONDS.toNanos(pauseForCQTBEMs);
this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER,
DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.startLogErrorsCnt =
conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY,
DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
@@ -173,6 +188,10 @@ class AsyncConnectionConfiguration {
return pauseNs;
}
+ long getPauseForCQTBENs() {
+ return pauseForCQTBENs;
+ }
+
int getMaxRetries() {
return maxRetries;
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
index 5ba4dee..de2778c 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
@@ -44,10 +44,10 @@ public class AsyncMasterRequestRpcRetryingCaller<T> extends
AsyncRpcRetryingCall
private final Callable<T> callable;
public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer,
AsyncConnectionImpl conn,
- Callable<T> callable, int priority, long pauseNs, int maxRetries, long
operationTimeoutNs,
- long rpcTimeoutNs, int startLogErrorsCnt) {
- super(retryTimer, conn, priority, pauseNs, maxRetries, operationTimeoutNs,
rpcTimeoutNs,
- startLogErrorsCnt);
+ Callable<T> callable, int priority, long pauseNs, long pauseForCQTBENs,
int maxRetries,
+ long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
+ super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxRetries,
operationTimeoutNs,
+ rpcTimeoutNs, startLogErrorsCnt);
this.callable = callable;
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
index 387b103..dcf7aa1 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
@@ -31,6 +31,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
+import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
@@ -59,6 +60,8 @@ public abstract class AsyncRpcRetryingCaller<T> {
private final long pauseNs;
+ private final long pauseForCQTBENs;
+
private int tries = 1;
private final int maxAttempts;
@@ -78,12 +81,13 @@ public abstract class AsyncRpcRetryingCaller<T> {
protected final HBaseRpcController controller;
public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
int priority,
- long pauseNs, int maxAttempts, long operationTimeoutNs, long
rpcTimeoutNs,
- int startLogErrorsCnt) {
+ long pauseNs, long pauseForCQTBENs, int maxAttempts, long
operationTimeoutNs,
+ long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
this.priority = priority;
this.pauseNs = pauseNs;
+ this.pauseForCQTBENs = pauseForCQTBENs;
this.maxAttempts = maxAttempts;
this.operationTimeoutNs = operationTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
@@ -123,6 +127,7 @@ public abstract class AsyncRpcRetryingCaller<T> {
}
private void tryScheduleRetry(Throwable error, Consumer<Throwable>
updateCachedLocation) {
+ long pauseNsToUse = error instanceof CallQueueTooBigException ?
pauseForCQTBENs : pauseNs;
long delayNs;
if (operationTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
@@ -130,9 +135,9 @@ public abstract class AsyncRpcRetryingCaller<T> {
completeExceptionally();
return;
}
- delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
+ delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
} else {
- delayNs = getPauseTime(pauseNs, tries - 1);
+ delayNs = getPauseTime(pauseNsToUse, tries - 1);
}
tries++;
retryTimer.newTimeout(t -> doCall(), delayNs, TimeUnit.NANOSECONDS);
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index 513f813..48bde44 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -58,6 +58,8 @@ class AsyncRpcRetryingCallerFactory {
protected long pauseNs = conn.connConf.getPauseNs();
+ protected long pauseForCQTBENs = conn.connConf.getPauseForCQTBENs();
+
protected int maxAttempts =
retries2Attempts(conn.connConf.getMaxRetries());
protected int startLogErrorsCnt = conn.connConf.getStartLogErrorsCnt();
@@ -117,6 +119,11 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
+ public SingleRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit
unit) {
+ this.pauseForCQTBENs = unit.toNanos(pause);
+ return this;
+ }
+
public SingleRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
return this;
@@ -149,8 +156,8 @@ class AsyncRpcRetryingCallerFactory {
public AsyncSingleRequestRpcRetryingCaller<T> build() {
preCheck();
return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn,
tableName, row, replicaId,
- locateType, callable, priority, pauseNs, maxAttempts,
operationTimeoutNs, rpcTimeoutNs,
- startLogErrorsCnt);
+ locateType, callable, priority, pauseNs, pauseForCQTBENs, maxAttempts,
operationTimeoutNs,
+ rpcTimeoutNs, startLogErrorsCnt);
}
/**
@@ -256,6 +263,11 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
+ public ScanSingleRegionCallerBuilder pauseForCQTBE(long pause, TimeUnit
unit) {
+ this.pauseForCQTBENs = unit.toNanos(pause);
+ return this;
+ }
+
public ScanSingleRegionCallerBuilder maxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
return this;
@@ -280,8 +292,8 @@ class AsyncRpcRetryingCallerFactory {
preCheck();
return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn,
scan, scanMetrics,
scannerId, resultCache, consumer, stub, loc, isRegionServerRemote,
priority,
- scannerLeaseTimeoutPeriodNs, pauseNs, maxAttempts, scanTimeoutNs,
rpcTimeoutNs,
- startLogErrorsCnt);
+ scannerLeaseTimeoutPeriodNs, pauseNs, pauseForCQTBENs, maxAttempts,
scanTimeoutNs,
+ rpcTimeoutNs, startLogErrorsCnt);
}
/**
@@ -335,6 +347,11 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
+ public BatchCallerBuilder pauseForCQTBE(long pause, TimeUnit unit) {
+ this.pauseForCQTBENs = unit.toNanos(pause);
+ return this;
+ }
+
public BatchCallerBuilder maxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
return this;
@@ -347,7 +364,7 @@ class AsyncRpcRetryingCallerFactory {
public <T> AsyncBatchRpcRetryingCaller<T> build() {
return new AsyncBatchRpcRetryingCaller<>(retryTimer, conn, tableName,
actions, pauseNs,
- maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
+ pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt);
}
public <T> List<CompletableFuture<T>> call() {
@@ -389,6 +406,11 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
+ public MasterRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit
unit) {
+ this.pauseForCQTBENs = unit.toNanos(pause);
+ return this;
+ }
+
public MasterRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
return this;
@@ -416,7 +438,7 @@ class AsyncRpcRetryingCallerFactory {
public AsyncMasterRequestRpcRetryingCaller<T> build() {
preCheck();
return new AsyncMasterRequestRpcRetryingCaller<T>(retryTimer, conn,
callable, priority,
- pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt);
+ pauseNs, pauseForCQTBENs, maxAttempts, operationTimeoutNs,
rpcTimeoutNs, startLogErrorsCnt);
}
/**
@@ -465,6 +487,11 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
+ public AdminRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit
unit) {
+ this.pauseForCQTBENs = unit.toNanos(pause);
+ return this;
+ }
+
public AdminRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
return this;
@@ -487,7 +514,7 @@ class AsyncRpcRetryingCallerFactory {
public AsyncAdminRequestRetryingCaller<T> build() {
return new AsyncAdminRequestRetryingCaller<T>(retryTimer, conn,
priority, pauseNs,
- maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
+ pauseForCQTBENs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
startLogErrorsCnt,
checkNotNull(serverName, "serverName is null"), checkNotNull(callable,
"action is null"));
}
@@ -531,6 +558,11 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
+ public ServerRequestCallerBuilder<T> pauseForCQTBE(long pause, TimeUnit
unit) {
+ this.pauseForCQTBENs = unit.toNanos(pause);
+ return this;
+ }
+
public ServerRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
return this;
@@ -547,8 +579,8 @@ class AsyncRpcRetryingCallerFactory {
}
public AsyncServerRequestRpcRetryingCaller<T> build() {
- return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn,
pauseNs, maxAttempts,
- operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
+ return new AsyncServerRequestRpcRetryingCaller<T>(retryTimer, conn,
pauseNs, pauseForCQTBENs,
+ maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt,
checkNotNull(serverName, "serverName is null"), checkNotNull(callable,
"action is null"));
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index b87d170..1fa3c81 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -34,6 +34,7 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NotServingRegionException;
@@ -97,6 +98,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private final long pauseNs;
+ private final long pauseForCQTBENs;
+
private final int maxAttempts;
private final long scanTimeoutNs;
@@ -304,7 +307,8 @@ class AsyncScanSingleRegionRpcRetryingCaller {
Scan scan, ScanMetrics scanMetrics, long scannerId, ScanResultCache
resultCache,
AdvancedScanResultConsumer consumer, Interface stub, HRegionLocation loc,
boolean isRegionServerRemote, int priority, long
scannerLeaseTimeoutPeriodNs, long pauseNs,
- int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int
startLogErrorsCnt) {
+ long pauseForCQTBENs, int maxAttempts, long scanTimeoutNs, long
rpcTimeoutNs,
+ int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.scan = scan;
this.scanMetrics = scanMetrics;
@@ -316,6 +320,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
this.regionServerRemote = isRegionServerRemote;
this.scannerLeaseTimeoutPeriodNs = scannerLeaseTimeoutPeriodNs;
this.pauseNs = pauseNs;
+ this.pauseForCQTBENs = pauseForCQTBENs;
this.maxAttempts = maxAttempts;
this.scanTimeoutNs = scanTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
@@ -405,15 +410,16 @@ class AsyncScanSingleRegionRpcRetryingCaller {
return;
}
long delayNs;
+ long pauseNsToUse = error instanceof CallQueueTooBigException ?
pauseForCQTBENs : pauseNs;
if (scanTimeoutNs > 0) {
long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
if (maxDelayNs <= 0) {
completeExceptionally(!scannerClosed);
return;
}
- delayNs = Math.min(maxDelayNs, getPauseTime(pauseNs, tries - 1));
+ delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
} else {
- delayNs = getPauseTime(pauseNs, tries - 1);
+ delayNs = getPauseTime(pauseNsToUse, tries - 1);
}
if (scannerClosed) {
completeWhenError(false);
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
index 63c85c2..52a2abe 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
@@ -46,10 +46,10 @@ public class AsyncServerRequestRpcRetryingCaller<T> extends
AsyncRpcRetryingCall
private ServerName serverName;
public AsyncServerRequestRpcRetryingCaller(Timer retryTimer,
AsyncConnectionImpl conn,
- long pauseNs, int maxAttempts, long operationTimeoutNs, long
rpcTimeoutNs,
- int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
- super(retryTimer, conn, HConstants.NORMAL_QOS, pauseNs, maxAttempts,
operationTimeoutNs,
- rpcTimeoutNs, startLogErrorsCnt);
+ long pauseNs, long pauseForCQTBENs, int maxAttempts, long
operationTimeoutNs,
+ long rpcTimeoutNs, int startLogErrorsCnt, ServerName serverName,
Callable<T> callable) {
+ super(retryTimer, conn, HConstants.NORMAL_QOS, pauseNs, pauseForCQTBENs,
maxAttempts,
+ operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
this.serverName = serverName;
this.callable = callable;
}
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index 9b0dede..2a552c7 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -56,10 +56,10 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends
AsyncRpcRetryingCaller<T> {
public AsyncSingleRequestRpcRetryingCaller(Timer retryTimer,
AsyncConnectionImpl conn,
TableName tableName, byte[] row, int replicaId, RegionLocateType
locateType,
- Callable<T> callable, int priority, long pauseNs, int maxAttempts, long
operationTimeoutNs,
- long rpcTimeoutNs, int startLogErrorsCnt) {
- super(retryTimer, conn, priority, pauseNs, maxAttempts,
operationTimeoutNs, rpcTimeoutNs,
- startLogErrorsCnt);
+ Callable<T> callable, int priority, long pauseNs, long pauseForCQTBENs,
int maxAttempts,
+ long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
+ super(retryTimer, conn, priority, pauseNs, pauseForCQTBENs, maxAttempts,
operationTimeoutNs,
+ rpcTimeoutNs, startLogErrorsCnt);
this.tableName = tableName;
this.row = row;
this.replicaId = replicaId;
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java
index 6632ad5..4c883a8 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java
@@ -76,10 +76,23 @@ public interface AsyncTableBuilder<C extends
ScanResultConsumerBase> {
/**
* Set the base pause time for retrying. We use an exponential policy to
generate sleep time when
* retrying.
+ * @see #setRetryPauseForCQTBE(long, TimeUnit)
*/
AsyncTableBuilder<C> setRetryPause(long pause, TimeUnit unit);
/**
+ * Set the base pause time for retrying when we hit {@code
CallQueueTooBigException}. We use an
+ * exponential policy to generate sleep time when retrying.
+ * <p/>
+ * This value should be greater than the normal pause value which could be
set with the above
+ * {@link #setRetryPause(long, TimeUnit)} method, as usually {@code
CallQueueTooBigException}
+ * means the server is overloaded. We just use the normal pause value for
+ * {@code CallQueueTooBigException} if here you specify a smaller value.
+ * @see #setRetryPause(long, TimeUnit)
+ */
+ AsyncTableBuilder<C> setRetryPauseForCQTBE(long pause, TimeUnit unit);
+
+ /**
* Set the max retry times for an operation. Usually it is the max attempt
times minus 1.
* <p>
* Operation timeout and max attempt times(or max retry times) are both
limitations for retrying,
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
index ee571f1..399d9dd 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
@@ -45,6 +45,8 @@ abstract class AsyncTableBuilderBase<C extends
ScanResultConsumerBase>
protected long pauseNs;
+ protected long pauseForCQTBENs;
+
protected int maxAttempts;
protected int startLogErrorsCnt;
@@ -58,6 +60,7 @@ abstract class AsyncTableBuilderBase<C extends
ScanResultConsumerBase>
this.readRpcTimeoutNs = connConf.getReadRpcTimeoutNs();
this.writeRpcTimeoutNs = connConf.getWriteRpcTimeoutNs();
this.pauseNs = connConf.getPauseNs();
+ this.pauseForCQTBENs = connConf.getPauseForCQTBENs();
this.maxAttempts = retries2Attempts(connConf.getMaxRetries());
this.startLogErrorsCnt = connConf.getStartLogErrorsCnt();
}
@@ -99,6 +102,12 @@ abstract class AsyncTableBuilderBase<C extends
ScanResultConsumerBase>
}
@Override
+ public AsyncTableBuilderBase<C> setRetryPauseForCQTBE(long pause, TimeUnit
unit) {
+ this.pauseForCQTBENs = unit.toNanos(pause);
+ return this;
+ }
+
+ @Override
public AsyncTableBuilderBase<C> setMaxAttempts(int maxAttempts) {
this.maxAttempts = maxAttempts;
return this;
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index 35cb922..0fd3cba 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -327,6 +327,8 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private final long pauseNs;
+ private final long pauseForCQTBENs;
+
private final int maxAttempts;
private final int startLogErrorsCnt;
@@ -341,6 +343,16 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
this.rpcTimeoutNs = builder.rpcTimeoutNs;
this.operationTimeoutNs = builder.operationTimeoutNs;
this.pauseNs = builder.pauseNs;
+ if (builder.pauseForCQTBENs < builder.pauseNs) {
+ LOG.warn(
+ "Configured value of pauseForCQTBENs is {} ms, which is less than" +
+ " the normal pause value {} ms, use the greater one instead",
+ TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
+ TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
+ this.pauseForCQTBENs = builder.pauseNs;
+ } else {
+ this.pauseForCQTBENs = builder.pauseForCQTBENs;
+ }
this.maxAttempts = builder.maxAttempts;
this.startLogErrorsCnt = builder.startLogErrorsCnt;
this.ng = connection.getNonceGenerator();
@@ -348,18 +360,18 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private <T> MasterRequestCallerBuilder<T> newMasterCaller() {
return this.connection.callerFactory.<T> masterRequest()
- .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
- .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
- .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
- .startLogErrorsCnt(startLogErrorsCnt);
+ .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+ .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+ .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs,
TimeUnit.NANOSECONDS)
+ .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
}
private <T> AdminRequestCallerBuilder<T> newAdminCaller() {
return this.connection.callerFactory.<T> adminRequest()
- .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
- .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
- .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
- .startLogErrorsCnt(startLogErrorsCnt);
+ .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+ .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+ .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs,
TimeUnit.NANOSECONDS)
+ .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
}
@FunctionalInterface
@@ -3357,10 +3369,10 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
private <T> ServerRequestCallerBuilder<T> newServerCaller() {
return this.connection.callerFactory.<T> serverRequest()
- .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
- .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
- .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
- .startLogErrorsCnt(startLogErrorsCnt);
+ .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+ .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+ .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs,
TimeUnit.NANOSECONDS)
+ .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
}
@Override
diff --git
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index b2ca3a9..8050137 100644
---
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -46,6 +46,8 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
@@ -77,6 +79,8 @@ import
org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType
@InterfaceAudience.Private
class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
+ private static final Logger LOG =
LoggerFactory.getLogger(RawAsyncTableImpl.class);
+
private final AsyncConnectionImpl conn;
private final Timer retryTimer;
@@ -99,6 +103,8 @@ class RawAsyncTableImpl implements
AsyncTable<AdvancedScanResultConsumer> {
private final long pauseNs;
+ private final long pauseForCQTBENs;
+
private final int maxAttempts;
private final int startLogErrorsCnt;
@@ -113,6 +119,16 @@ class RawAsyncTableImpl implements
AsyncTable<AdvancedScanResultConsumer> {
this.operationTimeoutNs = builder.operationTimeoutNs;
this.scanTimeoutNs = builder.scanTimeoutNs;
this.pauseNs = builder.pauseNs;
+ if (builder.pauseForCQTBENs < builder.pauseNs) {
+ LOG.warn(
+ "Configured value of pauseForCQTBENs is {} ms, which is less than" +
+ " the normal pause value {} ms, use the greater one instead",
+ TimeUnit.NANOSECONDS.toMillis(builder.pauseForCQTBENs),
+ TimeUnit.NANOSECONDS.toMillis(builder.pauseNs));
+ this.pauseForCQTBENs = builder.pauseNs;
+ } else {
+ this.pauseForCQTBENs = builder.pauseForCQTBENs;
+ }
this.maxAttempts = builder.maxAttempts;
this.startLogErrorsCnt = builder.startLogErrorsCnt;
this.defaultScannerCaching = tableName.isSystemTable() ?
conn.connConf.getMetaScannerCaching()
@@ -220,8 +236,8 @@ class RawAsyncTableImpl implements
AsyncTable<AdvancedScanResultConsumer> {
return conn.callerFactory.<T>
single().table(tableName).row(row).priority(priority)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
- .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
- .startLogErrorsCnt(startLogErrorsCnt);
+ .pause(pauseNs, TimeUnit.NANOSECONDS).pauseForCQTBE(pauseForCQTBENs,
TimeUnit.NANOSECONDS)
+ .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt);
}
private <T, R extends OperationWithAttributes & Row>
SingleRequestCallerBuilder<T> newCaller(
@@ -451,7 +467,8 @@ class RawAsyncTableImpl implements
AsyncTable<AdvancedScanResultConsumer> {
@Override
public void scan(Scan scan, AdvancedScanResultConsumer consumer) {
new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName,
conn, retryTimer,
- pauseNs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs,
startLogErrorsCnt).start();
+ pauseNs, pauseForCQTBENs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs,
startLogErrorsCnt)
+ .start();
}
private long resultSize2CacheSize(long maxResultSize) {
@@ -521,7 +538,8 @@ class RawAsyncTableImpl implements
AsyncTable<AdvancedScanResultConsumer> {
return conn.callerFactory.batch().table(tableName).actions(actions)
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs,
TimeUnit.NANOSECONDS)
- .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call();
+ .pauseForCQTBE(pauseForCQTBENs,
TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
+ .startLogErrorsCnt(startLogErrorsCnt).call();
}
@Override
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForCallQueueTooBig.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForCallQueueTooBig.java
new file mode 100644
index 0000000..075e1bc
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForCallQueueTooBig.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.CallRunner;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
+import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
+import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory;
+import org.apache.hadoop.hbase.regionserver.SimpleRpcSchedulerFactory;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+import
org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncClientPauseForCallQueueTooBig {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestAsyncClientPauseForCallQueueTooBig.class);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static TableName TABLE_NAME = TableName.valueOf("CQTBE");
+
+ private static byte[] FAMILY = Bytes.toBytes("Family");
+
+ private static byte[] QUALIFIER = Bytes.toBytes("Qualifier");
+
+ private static long PAUSE_FOR_CQTBE_NS = TimeUnit.SECONDS.toNanos(1);
+
+ private static AsyncConnection CONN;
+
+ private static boolean FAIL = false;
+
+ private static ConcurrentMap<MethodDescriptor, AtomicInteger> INVOKED = new
ConcurrentHashMap<>();
+
+ public static final class CQTBERpcScheduler extends SimpleRpcScheduler {
+
+ public CQTBERpcScheduler(Configuration conf, int handlerCount, int
priorityHandlerCount,
+ int replicationHandlerCount, int metaTransitionHandler,
PriorityFunction priority,
+ Abortable server, int highPriorityLevel) {
+ super(conf, handlerCount, priorityHandlerCount, replicationHandlerCount,
+ metaTransitionHandler, priority, server, highPriorityLevel);
+ }
+
+ @Override
+ public boolean dispatch(CallRunner callTask) throws InterruptedException {
+ if (FAIL) {
+ MethodDescriptor method = callTask.getRpcCall().getMethod();
+ // this is for test scan, where we will send a open scanner first and
then a next, and we
+ // expect that we hit CQTBE two times.
+ if (INVOKED.computeIfAbsent(method, k -> new
AtomicInteger(0)).getAndIncrement() % 2 == 0) {
+ return false;
+ }
+ }
+ return super.dispatch(callTask);
+ }
+ }
+
+ public static final class CQTBERpcSchedulerFactory extends
SimpleRpcSchedulerFactory {
+
+ @Override
+ public RpcScheduler create(Configuration conf, PriorityFunction priority,
Abortable server) {
+ int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
+ HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
+ return new CQTBERpcScheduler(conf, handlerCount,
+ conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
+ HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
+ conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
+ HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
+ conf.getInt(HConstants.MASTER_META_TRANSITION_HANDLER_COUNT,
+ HConstants.MASTER__META_TRANSITION_HANDLER_COUNT_DEFAULT),
+ priority, server, HConstants.QOS_THRESHOLD);
+ }
+
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10);
+ UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE,
+ TimeUnit.NANOSECONDS.toMillis(PAUSE_FOR_CQTBE_NS));
+
UTIL.getConfiguration().setClass(RSRpcServices.REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
+ CQTBERpcSchedulerFactory.class, RpcSchedulerFactory.class);
+ UTIL.startMiniCluster(1);
+ CONN =
ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ Closeables.close(CONN, true);
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setUpBeforeTest() throws IOException {
+ try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) {
+ for (int i = 0; i < 100; i++) {
+ table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER,
Bytes.toBytes(i)));
+ }
+ }
+ FAIL = true;
+ }
+
+ @After
+ public void tearDownAfterTest() throws IOException {
+ FAIL = false;
+ INVOKED.clear();
+ UTIL.getAdmin().disableTable(TABLE_NAME);
+ UTIL.getAdmin().deleteTable(TABLE_NAME);
+ }
+
+ private void assertTime(Callable<Void> callable, long time) throws Exception
{
+ long startNs = System.nanoTime();
+ callable.call();
+ long costNs = System.nanoTime() - startNs;
+ assertTrue(costNs > time);
+ }
+
+ @Test
+ public void testGet() throws Exception {
+ assertTime(() -> {
+ Result result = CONN.getTable(TABLE_NAME).get(new
Get(Bytes.toBytes(0))).get();
+ assertArrayEquals(Bytes.toBytes(0), result.getValue(FAMILY, QUALIFIER));
+ return null;
+ }, PAUSE_FOR_CQTBE_NS);
+ }
+
+ @Test
+ public void testBatch() throws Exception {
+ assertTime(() -> {
+ List<CompletableFuture<?>> futures = new ArrayList<>();
+ try (AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME))
{
+ for (int i = 100; i < 110; i++) {
+ futures.add(mutator
+ .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER,
Bytes.toBytes(i))));
+ }
+ }
+ return CompletableFuture.allOf(futures.toArray(new
CompletableFuture[0])).get();
+ }, PAUSE_FOR_CQTBE_NS);
+ }
+
+ @Test
+ public void testScan() throws Exception {
+ // we will hit CallQueueTooBigException two times so the sleep time should
be twice
+ assertTime(() -> {
+ try (
+ ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new
Scan().setCaching(80))) {
+ for (int i = 0; i < 100; i++) {
+ Result result = scanner.next();
+ assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY,
QUALIFIER));
+ }
+ assertNull(scanner.next());
+ }
+ return null;
+ }, PAUSE_FOR_CQTBE_NS * 2);
+ }
+}