This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 86e3e555eea HBASE-27798 Addendum changes from master branch
86e3e555eea is described below

commit 86e3e555eeabe3f7afe337a96b583f0371a26b64
Author: Ray Mattingly <rmdmattin...@gmail.com>
AuthorDate: Tue Jul 11 08:31:20 2023 -0400

    HBASE-27798 Addendum changes from master branch
    
    Signed-off-by: Bryan Beaudreault <bbeaudrea...@apache.org>
    Signed-off-by: Duo Zhang <zhang...@apache.org>
---
 .../hbase/client/AsyncBatchRpcRetryingCaller.java  | 33 ++------
 .../hbase/client/AsyncRpcRetryingCaller.java       | 24 ++----
 .../AsyncScanSingleRegionRpcRetryingCaller.java    | 25 +-----
 .../hadoop/hbase/client/ConnectionUtils.java       |  2 +-
 .../backoff/HBaseServerExceptionPauseManager.java  | 41 +++++++++-
 .../TestHBaseServerExceptionPauseManager.java      | 75 +++++++++++++++---
 .../TestAsyncClientPauseForRpcThrottling.java      | 92 +++++++++++++++++++++-
 7 files changed, 207 insertions(+), 85 deletions(-)

diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index f8363b87e70..063d3df34d0 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -18,9 +18,7 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.CellUtil.createCellScanner;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.calcPriority;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
 import static 
org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
 import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
@@ -162,7 +160,8 @@ class AsyncBatchRpcRetryingCaller<T> {
     this.actions = new ArrayList<>(actions.size());
     this.futures = new ArrayList<>(actions.size());
     this.action2Future = new IdentityHashMap<>(actions.size());
-    this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, 
pauseNsForServerOverloaded);
+    this.pauseManager =
+      new HBaseServerExceptionPauseManager(pauseNs, 
pauseNsForServerOverloaded, operationTimeoutNs);
     for (int i = 0, n = actions.size(); i < n; i++) {
       Row rawAction = actions.get(i);
       Action action;
@@ -203,10 +202,6 @@ class AsyncBatchRpcRetryingCaller<T> {
     return false;
   }
 
-  private long remainingTimeNs() {
-    return operationTimeoutNs - (System.nanoTime() - startNs);
-  }
-
   private List<ThrowableWithExtraContext> removeErrors(Action action) {
     synchronized (action2Errors) {
       return action2Errors.remove(action);
@@ -366,7 +361,7 @@ class AsyncBatchRpcRetryingCaller<T> {
   private void sendToServer(ServerName serverName, ServerRequest serverReq, 
int tries) {
     long remainingNs;
     if (operationTimeoutNs > 0) {
-      remainingNs = remainingTimeNs();
+      remainingNs = pauseManager.remainingTimeNs(startNs);
       if (remainingNs <= 0) {
         failAll(serverReq.actionsByRegion.values().stream().flatMap(r -> 
r.actions.stream()),
           tries);
@@ -473,27 +468,13 @@ class AsyncBatchRpcRetryingCaller<T> {
       groupAndSend(actions, tries);
       return;
     }
-    long delayNs;
-    boolean isServerOverloaded = 
HBaseServerException.isServerOverloaded(error);
-    OptionalLong maybePauseNsToUse =
-      pauseManager.getPauseNsFromException(error, remainingTimeNs() - 
SLEEP_DELTA_NS);
+    OptionalLong maybePauseNsToUse = 
pauseManager.getPauseNsFromException(error, tries, startNs);
     if (!maybePauseNsToUse.isPresent()) {
       failAll(actions, tries);
       return;
     }
-    long pauseNsToUse = maybePauseNsToUse.getAsLong();
-
-    if (operationTimeoutNs > 0) {
-      long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
-      if (maxDelayNs <= 0) {
-        failAll(actions, tries);
-        return;
-      }
-      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
-    } else {
-      delayNs = getPauseTime(pauseNsToUse, tries - 1);
-    }
-    if (isServerOverloaded) {
+    long delayNs = maybePauseNsToUse.getAsLong();
+    if (HBaseServerException.isServerOverloaded(error)) {
       Optional<MetricsConnection> metrics = conn.getConnectionMetrics();
       metrics.ifPresent(m -> m.incrementServerOverloadedBackoffTime(delayNs, 
TimeUnit.NANOSECONDS));
     }
@@ -503,7 +484,7 @@ class AsyncBatchRpcRetryingCaller<T> {
   private void groupAndSend(Stream<Action> actions, int tries) {
     long locateTimeoutNs;
     if (operationTimeoutNs > 0) {
-      locateTimeoutNs = remainingTimeNs();
+      locateTimeoutNs = pauseManager.remainingTimeNs(startNs);
       if (locateTimeoutNs <= 0) {
         failAll(actions, tries);
         return;
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
index 7ded355cfd9..f89aef21439 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
 import static 
org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
 
@@ -93,7 +91,8 @@ public abstract class AsyncRpcRetryingCaller<T> {
     this.controller.setPriority(priority);
     this.exceptions = new ArrayList<>();
     this.startNs = System.nanoTime();
-    this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, 
pauseNsForServerOverloaded);
+    this.pauseManager =
+      new HBaseServerExceptionPauseManager(pauseNs, 
pauseNsForServerOverloaded, operationTimeoutNs);
   }
 
   private long elapsedMs() {
@@ -101,7 +100,7 @@ public abstract class AsyncRpcRetryingCaller<T> {
   }
 
   protected final long remainingTimeNs() {
-    return operationTimeoutNs - (System.nanoTime() - startNs);
+    return pauseManager.remainingTimeNs(startNs);
   }
 
   protected final void completeExceptionally() {
@@ -124,25 +123,12 @@ public abstract class AsyncRpcRetryingCaller<T> {
   }
 
   private void tryScheduleRetry(Throwable error) {
-    OptionalLong maybePauseNsToUse =
-      pauseManager.getPauseNsFromException(error, remainingTimeNs() - 
SLEEP_DELTA_NS);
+    OptionalLong maybePauseNsToUse = 
pauseManager.getPauseNsFromException(error, tries, startNs);
     if (!maybePauseNsToUse.isPresent()) {
       completeExceptionally();
       return;
     }
-    long pauseNsToUse = maybePauseNsToUse.getAsLong();
-
-    long delayNs;
-    if (operationTimeoutNs > 0) {
-      long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
-      if (maxDelayNs <= 0) {
-        completeExceptionally();
-        return;
-      }
-      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
-    } else {
-      delayNs = getPauseTime(pauseNsToUse, tries - 1);
-    }
+    long delayNs = maybePauseNsToUse.getAsLong();
     tries++;
 
     if (HBaseServerException.isServerOverloaded(error)) {
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index 949ea07107b..a046f0c7b6e 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -17,8 +17,6 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
 import static 
org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
 import static 
org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
 import static 
org.apache.hadoop.hbase.client.ConnectionUtils.noMoreResultsForReverseScan;
@@ -344,17 +342,14 @@ class AsyncScanSingleRegionRpcRetryingCaller {
     this.controller = conn.rpcControllerFactory.newController();
     this.controller.setPriority(priority);
     this.exceptions = new ArrayList<>();
-    this.pauseManager = new HBaseServerExceptionPauseManager(pauseNs, 
pauseNsForServerOverloaded);
+    this.pauseManager =
+      new HBaseServerExceptionPauseManager(pauseNs, 
pauseNsForServerOverloaded, scanTimeoutNs);
   }
 
   private long elapsedMs() {
     return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - nextCallStartNs);
   }
 
-  private long remainingTimeNs() {
-    return scanTimeoutNs - (System.nanoTime() - nextCallStartNs);
-  }
-
   private void closeScanner() {
     incRPCCallsMetrics(scanMetrics, regionServerRemote);
     resetController(controller, rpcTimeoutNs, HConstants.HIGH_QOS);
@@ -417,26 +412,14 @@ class AsyncScanSingleRegionRpcRetryingCaller {
       completeExceptionally(!scannerClosed);
       return;
     }
-    long delayNs;
 
     OptionalLong maybePauseNsToUse =
-      pauseManager.getPauseNsFromException(error, remainingTimeNs() - 
SLEEP_DELTA_NS);
+      pauseManager.getPauseNsFromException(error, tries, nextCallStartNs);
     if (!maybePauseNsToUse.isPresent()) {
       completeExceptionally(!scannerClosed);
       return;
     }
-    long pauseNsToUse = maybePauseNsToUse.getAsLong();
-
-    if (scanTimeoutNs > 0) {
-      long maxDelayNs = remainingTimeNs() - SLEEP_DELTA_NS;
-      if (maxDelayNs <= 0) {
-        completeExceptionally(!scannerClosed);
-        return;
-      }
-      delayNs = Math.min(maxDelayNs, getPauseTime(pauseNsToUse, tries - 1));
-    } else {
-      delayNs = getPauseTime(pauseNsToUse, tries - 1);
-    }
+    long delayNs = maybePauseNsToUse.getAsLong();
     if (scannerClosed) {
       completeWhenError(false);
       return;
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 5c88af3780c..9b44c682b4a 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -280,7 +280,7 @@ public final class ConnectionUtils {
   }
 
   // Add a delta to avoid timeout immediately after a retry sleeping.
-  static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1);
+  public static final long SLEEP_DELTA_NS = TimeUnit.MILLISECONDS.toNanos(1);
 
   static Get toCheckExistenceOnly(Get get) {
     if (get.isCheckExistenceOnly()) {
diff --git 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/HBaseServerExceptionPauseManager.java
 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/HBaseServerExceptionPauseManager.java
index 235d1d1d20e..67f46822fe3 100644
--- 
a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/HBaseServerExceptionPauseManager.java
+++ 
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/backoff/HBaseServerExceptionPauseManager.java
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.hbase.client.backoff;
 
+import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
+
 import java.util.OptionalLong;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.HBaseServerException;
@@ -31,29 +34,59 @@ public class HBaseServerExceptionPauseManager {
 
   private final long pauseNs;
   private final long pauseNsForServerOverloaded;
+  private final long timeoutNs;
 
-  public HBaseServerExceptionPauseManager(long pauseNs, long 
pauseNsForServerOverloaded) {
+  public HBaseServerExceptionPauseManager(long pauseNs, long 
pauseNsForServerOverloaded,
+    long timeoutNs) {
     this.pauseNs = pauseNs;
     this.pauseNsForServerOverloaded = pauseNsForServerOverloaded;
+    this.timeoutNs = timeoutNs;
   }
 
-  public OptionalLong getPauseNsFromException(Throwable error, long 
remainingTimeNs) {
+  /**
+   * Returns the nanos, if any, for which the client should wait
+   * @param error The exception from the server
+   * @param tries The current retry count
+   * @return The time, in nanos, to pause. If empty then pausing would exceed 
our timeout, so we
+   *         should throw now
+   */
+  public OptionalLong getPauseNsFromException(Throwable error, int tries, long 
startNs) {
     long expectedSleepNs;
+    long remainingTimeNs = remainingTimeNs(startNs) - SLEEP_DELTA_NS;
     if (error instanceof RpcThrottlingException) {
       RpcThrottlingException rpcThrottlingException = (RpcThrottlingException) 
error;
       expectedSleepNs = 
TimeUnit.MILLISECONDS.toNanos(rpcThrottlingException.getWaitInterval());
-      if (expectedSleepNs > remainingTimeNs) {
+      if (expectedSleepNs > remainingTimeNs && remainingTimeNs > 0) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("RpcThrottlingException suggested pause of {}ns which 
would exceed "
+            + "the timeout. We should throw instead.", expectedSleepNs, 
rpcThrottlingException);
+        }
         return OptionalLong.empty();
       }
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Sleeping for {}ms after catching RpcThrottlingException", 
expectedSleepNs,
+        LOG.debug("Sleeping for {}ns after catching RpcThrottlingException", 
expectedSleepNs,
           rpcThrottlingException);
       }
     } else {
       expectedSleepNs =
         HBaseServerException.isServerOverloaded(error) ? 
pauseNsForServerOverloaded : pauseNs;
+      // RpcThrottlingException tells us exactly how long the client should 
wait for,
+      // so we should not factor in the retry count for said exception
+      expectedSleepNs = getPauseTime(expectedSleepNs, tries - 1);
+    }
+
+    if (timeoutNs > 0) {
+      if (remainingTimeNs <= 0) {
+        return OptionalLong.empty();
+      }
+      expectedSleepNs = Math.min(remainingTimeNs, expectedSleepNs);
     }
+
     return OptionalLong.of(expectedSleepNs);
   }
 
+  public long remainingTimeNs(long startNs) {
+    return timeoutNs - (System.nanoTime() - startNs);
+  }
+
 }
diff --git 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/backoff/TestHBaseServerExceptionPauseManager.java
 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/backoff/TestHBaseServerExceptionPauseManager.java
index 793fa9f6d21..ee4ee47f185 100644
--- 
a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/backoff/TestHBaseServerExceptionPauseManager.java
+++ 
b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/backoff/TestHBaseServerExceptionPauseManager.java
@@ -47,40 +47,93 @@ public class TestHBaseServerExceptionPauseManager {
   private final Throwable OTHER_EXCEPTION = new RuntimeException("");
   private final HBaseServerException SERVER_OVERLOADED_EXCEPTION = new 
HBaseServerException(true);
 
-  private final HBaseServerExceptionPauseManager pauseManager =
-    new HBaseServerExceptionPauseManager(PAUSE_NANOS, 
PAUSE_NANOS_FOR_SERVER_OVERLOADED);
-
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestHBaseServerExceptionPauseManager.class);
 
   @Test
-  public void itSupportsRpcThrottlingNanos() {
+  public void itSupportsRpcThrottlingNanosNoTimeout() {
+    HBaseServerExceptionPauseManager pauseManager =
+      new HBaseServerExceptionPauseManager(PAUSE_NANOS, 
PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);
+
     OptionalLong pauseNanos =
-      pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 
Long.MAX_VALUE);
+      pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 1, 
System.nanoTime());
+
+    assertTrue(pauseNanos.isPresent());
+    assertEquals(pauseNanos.getAsLong(), WAIT_INTERVAL_NANOS);
+  }
+
+  @Test
+  public void itSupportsRpcThrottlingNanosLenientTimeout() {
+    HBaseServerExceptionPauseManager pauseManager = new 
HBaseServerExceptionPauseManager(
+      PAUSE_NANOS, PAUSE_NANOS_FOR_SERVER_OVERLOADED, System.nanoTime() * 2);
+
+    OptionalLong pauseNanos =
+      pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 1, 
System.nanoTime());
+
     assertTrue(pauseNanos.isPresent());
     assertEquals(pauseNanos.getAsLong(), WAIT_INTERVAL_NANOS);
   }
 
   @Test
   public void itSupportsServerOverloadedExceptionNanos() {
+    HBaseServerExceptionPauseManager pauseManager =
+      new HBaseServerExceptionPauseManager(PAUSE_NANOS, 
PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);
+
     OptionalLong pauseNanos =
-      pauseManager.getPauseNsFromException(SERVER_OVERLOADED_EXCEPTION, 
Long.MAX_VALUE);
+      pauseManager.getPauseNsFromException(SERVER_OVERLOADED_EXCEPTION, 1, 
System.nanoTime());
+
     assertTrue(pauseNanos.isPresent());
-    assertEquals(pauseNanos.getAsLong(), PAUSE_NANOS_FOR_SERVER_OVERLOADED);
+    // account for 1% jitter in pause time
+    assertTrue(pauseNanos.getAsLong() >= PAUSE_NANOS_FOR_SERVER_OVERLOADED * 
0.99);
+    assertTrue(pauseNanos.getAsLong() <= PAUSE_NANOS_FOR_SERVER_OVERLOADED * 
1.01);
   }
 
   @Test
   public void itSupportsOtherExceptionNanos() {
-    OptionalLong pauseNanos = 
pauseManager.getPauseNsFromException(OTHER_EXCEPTION, Long.MAX_VALUE);
+    HBaseServerExceptionPauseManager pauseManager =
+      new HBaseServerExceptionPauseManager(PAUSE_NANOS, 
PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);
+
+    OptionalLong pauseNanos =
+      pauseManager.getPauseNsFromException(OTHER_EXCEPTION, 1, 
System.nanoTime());
+
     assertTrue(pauseNanos.isPresent());
-    assertEquals(pauseNanos.getAsLong(), PAUSE_NANOS);
+    // account for 1% jitter in pause time
+    assertTrue(pauseNanos.getAsLong() >= PAUSE_NANOS * 0.99);
+    assertTrue(pauseNanos.getAsLong() <= PAUSE_NANOS * 1.01);
   }
 
   @Test
-  public void itThrottledTimeoutFastFail() {
-    OptionalLong pauseNanos = 
pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 0L);
+  public void itTimesOutRpcThrottlingException() {
+    HBaseServerExceptionPauseManager pauseManager =
+      new HBaseServerExceptionPauseManager(PAUSE_NANOS, 
PAUSE_NANOS_FOR_SERVER_OVERLOADED, 1);
+
+    OptionalLong pauseNanos =
+      pauseManager.getPauseNsFromException(RPC_THROTTLING_EXCEPTION, 1, 
System.nanoTime());
+
+    assertFalse(pauseNanos.isPresent());
+  }
+
+  @Test
+  public void itTimesOutRpcOtherException() {
+    HBaseServerExceptionPauseManager pauseManager =
+      new HBaseServerExceptionPauseManager(PAUSE_NANOS, 
PAUSE_NANOS_FOR_SERVER_OVERLOADED, 1);
+
+    OptionalLong pauseNanos =
+      pauseManager.getPauseNsFromException(OTHER_EXCEPTION, 1, 
System.nanoTime());
+
     assertFalse(pauseNanos.isPresent());
   }
 
+  @Test
+  public void itDoesNotTimeOutIfDisabled() {
+    HBaseServerExceptionPauseManager pauseManager =
+      new HBaseServerExceptionPauseManager(PAUSE_NANOS, 
PAUSE_NANOS_FOR_SERVER_OVERLOADED, 0);
+
+    OptionalLong pauseNanos =
+      pauseManager.getPauseNsFromException(OTHER_EXCEPTION, 1, 
System.nanoTime());
+
+    assertTrue(pauseNanos.isPresent());
+  }
+
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForRpcThrottling.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForRpcThrottling.java
index ab63a9cb3c3..1523914213a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForRpcThrottling.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncClientPauseForRpcThrottling.java
@@ -29,10 +29,12 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.RegionTooBusyException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.quotas.RpcThrottlingException;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
@@ -69,7 +71,10 @@ public class TestAsyncClientPauseForRpcThrottling {
 
   private static AsyncConnection CONN;
   private static final AtomicBoolean THROTTLE = new AtomicBoolean(false);
+  private static final AtomicInteger FORCE_RETRIES = new AtomicInteger(0);
   private static final long WAIT_INTERVAL_NANOS = TimeUnit.SECONDS.toNanos(1);
+  private static final int RETRY_COUNT = 3;
+  private static final int MAX_MULTIPLIER_EXPECTATION = 2;
 
   public static final class ThrottlingRSRpcServicesForTest extends 
RSRpcServices {
 
@@ -80,6 +85,7 @@ public class TestAsyncClientPauseForRpcThrottling {
     @Override
     public ClientProtos.GetResponse get(RpcController controller, 
ClientProtos.GetRequest request)
       throws ServiceException {
+      maybeForceRetry();
       maybeThrottle();
       return super.get(controller, request);
     }
@@ -87,6 +93,7 @@ public class TestAsyncClientPauseForRpcThrottling {
     @Override
     public ClientProtos.MultiResponse multi(RpcController rpcc, 
ClientProtos.MultiRequest request)
       throws ServiceException {
+      maybeForceRetry();
       maybeThrottle();
       return super.multi(rpcc, request);
     }
@@ -94,10 +101,18 @@ public class TestAsyncClientPauseForRpcThrottling {
     @Override
     public ClientProtos.ScanResponse scan(RpcController controller,
       ClientProtos.ScanRequest request) throws ServiceException {
+      maybeForceRetry();
       maybeThrottle();
       return super.scan(controller, request);
     }
 
+    private void maybeForceRetry() throws ServiceException {
+      if (FORCE_RETRIES.get() > 0) {
+        FORCE_RETRIES.addAndGet(-1);
+        throw new ServiceException(new RegionTooBusyException("Retry"));
+      }
+    }
+
     private void maybeThrottle() throws ServiceException {
       if (THROTTLE.get()) {
         THROTTLE.set(false);
@@ -121,6 +136,12 @@ public class TestAsyncClientPauseForRpcThrottling {
 
   @BeforeClass
   public static void setUp() throws Exception {
+    assertTrue(
+      "The MAX_MULTIPLIER_EXPECTATION must be less than 
HConstants.RETRY_BACKOFF[RETRY_COUNT] "
+        + "in order for our tests to adequately verify that we aren't "
+        + "multiplying throttled pauses based on the retry count.",
+      MAX_MULTIPLIER_EXPECTATION < HConstants.RETRY_BACKOFF[RETRY_COUNT]);
+
     UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
     UTIL.startMiniCluster(1);
     
UTIL.getMiniHBaseCluster().getConfiguration().setClass(HConstants.REGION_SERVER_IMPL,
@@ -149,9 +170,7 @@ public class TestAsyncClientPauseForRpcThrottling {
   }
 
   private void assertTime(Callable<Void> callable, long time, boolean 
isGreater) throws Exception {
-    long startNs = System.nanoTime();
-    callable.call();
-    long costNs = System.nanoTime() - startNs;
+    long costNs = getCostNs(callable);
     if (isGreater) {
       assertTrue(costNs > time);
     } else {
@@ -159,6 +178,18 @@ public class TestAsyncClientPauseForRpcThrottling {
     }
   }
 
+  private void assertTimeBetween(Callable<Void> callable, long minNs, long 
maxNs) throws Exception {
+    long costNs = getCostNs(callable);
+    assertTrue(costNs > minNs);
+    assertTrue(costNs < maxNs);
+  }
+
+  private long getCostNs(Callable<Void> callable) throws Exception {
+    long startNs = System.nanoTime();
+    callable.call();
+    return System.nanoTime() - startNs;
+  }
+
   @Test
   public void itWaitsForThrottledGet() throws Exception {
     boolean isThrottled = true;
@@ -193,6 +224,21 @@ public class TestAsyncClientPauseForRpcThrottling {
     }, WAIT_INTERVAL_NANOS, false);
   }
 
+  @Test
+  public void itDoesNotMultiplyThrottledGetWait() throws Exception {
+    THROTTLE.set(true);
+    FORCE_RETRIES.set(RETRY_COUNT);
+
+    AsyncTable<AdvancedScanResultConsumer> table =
+      CONN.getTableBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MINUTES)
+        .setMaxRetries(RETRY_COUNT + 1).setRetryPause(1, 
TimeUnit.NANOSECONDS).build();
+
+    assertTimeBetween(() -> {
+      table.get(new Get(Bytes.toBytes(0))).get();
+      return null;
+    }, WAIT_INTERVAL_NANOS, MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS);
+  }
+
   @Test
   public void itWaitsForThrottledBatch() throws Exception {
     boolean isThrottled = true;
@@ -244,6 +290,26 @@ public class TestAsyncClientPauseForRpcThrottling {
     }, WAIT_INTERVAL_NANOS, false);
   }
 
+  @Test
+  public void itDoesNotMultiplyThrottledBatchWait() throws Exception {
+    THROTTLE.set(true);
+    FORCE_RETRIES.set(RETRY_COUNT);
+
+    assertTimeBetween(() -> {
+      List<CompletableFuture<?>> futures = new ArrayList<>();
+      try (AsyncBufferedMutator mutator =
+        CONN.getBufferedMutatorBuilder(TABLE_NAME).setOperationTimeout(1, 
TimeUnit.MINUTES)
+          .setMaxRetries(RETRY_COUNT + 1).setRetryPause(1, 
TimeUnit.NANOSECONDS).build()) {
+        for (int i = 100; i < 110; i++) {
+          futures.add(mutator
+            .mutate(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUALIFIER, 
Bytes.toBytes(i))));
+        }
+      }
+      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
+      return null;
+    }, WAIT_INTERVAL_NANOS, MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS);
+  }
+
   @Test
   public void itWaitsForThrottledScan() throws Exception {
     boolean isThrottled = true;
@@ -291,4 +357,24 @@ public class TestAsyncClientPauseForRpcThrottling {
       return null;
     }, WAIT_INTERVAL_NANOS, false);
   }
+
+  @Test
+  public void itDoesNotMultiplyThrottledScanWait() throws Exception {
+    THROTTLE.set(true);
+    FORCE_RETRIES.set(RETRY_COUNT);
+
+    AsyncTable<AdvancedScanResultConsumer> table =
+      CONN.getTableBuilder(TABLE_NAME).setOperationTimeout(1, TimeUnit.MINUTES)
+        .setMaxRetries(RETRY_COUNT + 1).setRetryPause(1, 
TimeUnit.NANOSECONDS).build();
+
+    assertTimeBetween(() -> {
+      try (ResultScanner scanner = table.getScanner(new 
Scan().setCaching(80))) {
+        for (int i = 0; i < 100; i++) {
+          Result result = scanner.next();
+          assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, 
QUALIFIER));
+        }
+      }
+      return null;
+    }, WAIT_INTERVAL_NANOS, MAX_MULTIPLIER_EXPECTATION * WAIT_INTERVAL_NANOS);
+  }
 }

Reply via email to