This is an automated email from the ASF dual-hosted git repository.

rmattingly pushed a commit to branch HBASE-29351-branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 28c541f5a9b145a18af1c085aa111678615547dc
Author: Ray Mattingly <[email protected]>
AuthorDate: Wed Oct 22 13:48:16 2025 -0400

    HBASE-29351 Quotas: adaptive wait intervals (#7396)
    
    Co-authored-by: Ray Mattingly <[email protected]>
    Signed-off-by: Charles Connell <[email protected]>
---
 .../hbase/quotas/FeedbackAdaptiveRateLimiter.java  | 361 ++++++++++++++
 .../hadoop/hbase/quotas/TimeBasedLimiter.java      |  25 +-
 .../quotas/TestFeedbackAdaptiveRateLimiter.java    | 542 +++++++++++++++++++++
 3 files changed, 923 insertions(+), 5 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FeedbackAdaptiveRateLimiter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FeedbackAdaptiveRateLimiter.java
new file mode 100644
index 00000000000..6acfd07328e
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FeedbackAdaptiveRateLimiter.java
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicDouble;
+
+/**
+ * An adaptive rate limiter that dynamically adjusts its behavior based on 
observed usage patterns
+ * to achieve stable, full utilization of configured quota allowances while 
managing client
+ * contention.
+ * <p>
+ * <b>Core Algorithm:</b> This rate limiter divides time into fixed refill 
intervals (configurable
+ * via {@code hbase.quota.rate.limiter.refill.interval.ms}, default is 1 
refill per TimeUnit of the
+ * RateLimiter). At the beginning of each interval, a fresh allocation of 
resources becomes
+ * available based on the configured limit. Clients consume resources as they 
make requests. When
+ * resources are exhausted, clients must wait until the next refill, or until 
enough resources
+ * become available.
+ * <p>
+ * <b>Adaptive Backpressure:</b> When multiple threads compete for limited 
resources (contention),
+ * this limiter detects the contention and applies increasing backpressure by 
extending wait
+ * intervals. This prevents thundering herd behavior where many threads wake 
simultaneously and
+ * compete for the same resources. The backoff multiplier increases by a small 
increment (see
+ * {@link #FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT}) per interval when 
contention occurs, and
+ * decreases (see {@link #FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT}) 
when no contention is
+ * detected, converging toward optimal throughput. The multiplier is capped at 
a maximum value (see
+ * {@link #FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER}) to prevent unbounded 
waits.
+ * <p>
+ * Contention is detected when {@link #getWaitInterval} is called with 
insufficient available
+ * resources (i.e., {@code amount > available}), indicating a thread needs to 
wait for resources. If
+ * this occurs more than once in a refill interval, the limiter identifies it 
as contention
+ * requiring increased backpressure.
+ * <p>
+ * <b>Oversubscription for Full Utilization:</b> In practice, synchronization 
overhead and timing
+ * variations often prevent clients from consuming exactly their full 
allowance, resulting in
+ * consistent under-utilization. This limiter addresses this by tracking 
utilization via an
+ * exponentially weighted moving average (EWMA). When average utilization 
falls below the target
+ * range (determined by {@link #FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET}), 
the limiter gradually
+ * increases the oversubscription proportion (see
+ * {@link #FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT}), allowing more 
resources per interval than
+ * the base limit. Conversely, when utilization exceeds the target range, 
oversubscription is
+ * decreased (see {@link #FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT}). 
Oversubscription is capped
+ * (see {@link #FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION}) to prevent excessive 
bursts while still
+ * enabling consistent full utilization.
+ * <p>
+ * <b>Example Scenario:</b> Consider a quota of 1000 requests per second with 
a 1-second refill
+ * interval. Without oversubscription, clients might typically achieve only 
950 req/s due to
+ * coordination delays. This limiter would detect the under-utilization, 
gradually increase
+ * oversubscription, allowing slightly more resources per interval, which 
compensates for
+ * inefficiencies and achieves stable throughput closer to the configured 
quota. If multiple threads
+ * simultaneously try to consume resources and repeatedly wait, the backoff 
multiplier increases
+ * their wait times, spreading out their retry attempts and reducing wasted 
CPU cycles.
+ * <p>
+ * <b>Configuration Parameters:</b>
+ * <ul>
+ * <li>{@link #FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT}: Controls rate 
of backpressure
+ * increase</li>
+ * <li>{@link #FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT}: Controls rate 
of backpressure
+ * decrease</li>
+ * <li>{@link #FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER}: Caps the maximum 
wait time extension</li>
+ * <li>{@link #FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT}: Controls rate of 
oversubscription
+ * increase</li>
+ * <li>{@link #FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT}: Controls rate of 
oversubscription
+ * decrease</li>
+ * <li>{@link #FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION}: Caps the maximum burst 
capacity</li>
+ * <li>{@link #FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET}: Defines the 
acceptable range around full
+ * utilization</li>
+ * </ul>
+ * <p>
+ * This algorithm converges toward stable operation where: (1) wait intervals 
are just long enough
+ * to prevent excessive contention, and (2) oversubscription is just high 
enough to achieve
+ * consistent full utilization of the configured allowance.
+ */
[email protected]
[email protected]
+public class FeedbackAdaptiveRateLimiter extends RateLimiter {
+
+  /**
+   * Amount to increase the backoff multiplier when contention is detected per 
refill interval. In
+   * other words, if we are throttling more than once per refill interval, 
then we will increase our
+   * wait intervals (increase backpressure, decrease throughput).
+   */
+  public static final String FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT =
+    "hbase.quota.rate.limiter.feedback.adaptive.backoff.multiplier.increment";
+  public static final double DEFAULT_BACKOFF_MULTIPLIER_INCREMENT = 0.0005;
+
+  /**
+   * Amount to decrease the backoff multiplier when no contention is detected 
per refill interval.
+   * In other words, if we are only throttling once per refill interval, then 
we will decrease our
+   * wait interval (decrease backpressure, increase throughput).
+   */
+  public static final String FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT =
+    "hbase.quota.rate.limiter.feedback.adaptive.backoff.multiplier.decrement";
+  public static final double DEFAULT_BACKOFF_MULTIPLIER_DECREMENT = 0.0001;
+
+  /**
+   * Maximum ceiling for the backoff multiplier to avoid unbounded waits.
+   */
+  public static final String FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER =
+    "hbase.quota.rate.limiter.feedback.adaptive.max.backoff.multiplier";
+  public static final double DEFAULT_MAX_BACKOFF_MULTIPLIER = 10.0;
+
+  /**
+   * Amount to increase the oversubscription proportion when utilization is 
below (1.0-errorBudget).
+   */
+  public static final String FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT =
+    "hbase.quota.rate.limiter.feedback.adaptive.oversubscription.increment";
+  public static final double DEFAULT_OVERSUBSCRIPTION_INCREMENT = 0.001;
+
+  /**
+   * Amount to decrease the oversubscription proportion when utilization 
exceeds (1.0+errorBudget).
+   */
+  public static final String FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT =
+    "hbase.quota.rate.limiter.feedback.adaptive.oversubscription.decrement";
+  public static final double DEFAULT_OVERSUBSCRIPTION_DECREMENT = 0.00005;
+
+  /**
+   * Maximum ceiling for oversubscription to prevent unbounded bursts. Some 
oversubscription can be
+   * nice, because it allows you to balance the inefficiency and latency of 
retries, landing on
+   * stable usage at approximately your configured allowance. Without adequate 
oversubscription,
+   * your steady state may often seem significantly, and suspiciously, lower 
than your configured
+   * allowance.
+   */
+  public static final String FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION =
+    "hbase.quota.rate.limiter.feedback.adaptive.max.oversubscription";
+  public static final double DEFAULT_MAX_OVERSUBSCRIPTION = 0.25;
+
+  /**
+   * Acceptable deviation around full utilization (1.0) for adjusting 
oversubscription. If stable
+   * throttle usage is typically under (1.0-errorBudget), then we will allow 
more oversubscription.
+   * If stable throttle usage is typically over (1.0+errorBudget), then we 
will pull back
+   * oversubscription.
+   */
+  public static final String FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET =
+    "hbase.quota.rate.limiter.feedback.adaptive.utilization.error.budget";
+  public static final double DEFAULT_UTILIZATION_ERROR_BUDGET = 0.025;
+
+  private static final int WINDOW_TIME_MS = 60_000;
+
+  public static class FeedbackAdaptiveRateLimiterFactory {
+
+    private final long refillInterval;
+    private final double backoffMultiplierIncrement;
+    private final double backoffMultiplierDecrement;
+    private final double maxBackoffMultiplier;
+    private final double oversubscriptionIncrement;
+    private final double oversubscriptionDecrement;
+    private final double maxOversubscription;
+    private final double utilizationErrorBudget;
+
+    public FeedbackAdaptiveRateLimiterFactory(Configuration conf) {
+      refillInterval = 
conf.getLong(FixedIntervalRateLimiter.RATE_LIMITER_REFILL_INTERVAL_MS,
+        RateLimiter.DEFAULT_TIME_UNIT);
+
+      maxBackoffMultiplier =
+        conf.getDouble(FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER, 
DEFAULT_MAX_BACKOFF_MULTIPLIER);
+
+      backoffMultiplierIncrement = 
conf.getDouble(FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT,
+        DEFAULT_BACKOFF_MULTIPLIER_INCREMENT);
+      backoffMultiplierDecrement = 
conf.getDouble(FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT,
+        DEFAULT_BACKOFF_MULTIPLIER_DECREMENT);
+
+      oversubscriptionIncrement = 
conf.getDouble(FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT,
+        DEFAULT_OVERSUBSCRIPTION_INCREMENT);
+      oversubscriptionDecrement = 
conf.getDouble(FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT,
+        DEFAULT_OVERSUBSCRIPTION_DECREMENT);
+
+      maxOversubscription =
+        conf.getDouble(FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION, 
DEFAULT_MAX_OVERSUBSCRIPTION);
+      utilizationErrorBudget = 
conf.getDouble(FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET,
+        DEFAULT_UTILIZATION_ERROR_BUDGET);
+    }
+
+    public FeedbackAdaptiveRateLimiter create() {
+      return new FeedbackAdaptiveRateLimiter(refillInterval, 
backoffMultiplierIncrement,
+        backoffMultiplierDecrement, maxBackoffMultiplier, 
oversubscriptionIncrement,
+        oversubscriptionDecrement, maxOversubscription, 
utilizationErrorBudget);
+    }
+  }
+
+  private volatile long nextRefillTime = -1L;
+  private final long refillInterval;
+  private final double backoffMultiplierIncrement;
+  private final double backoffMultiplierDecrement;
+  private final double maxBackoffMultiplier;
+  private final double oversubscriptionIncrement;
+  private final double oversubscriptionDecrement;
+  private final double maxOversubscription;
+  private final double minTargetUtilization;
+  private final double maxTargetUtilization;
+
+  // Adaptive backoff state
+  private final AtomicDouble currentBackoffMultiplier = new AtomicDouble(1.0);
+  private volatile boolean hadContentionThisInterval = false;
+
+  // Over-subscription proportion state
+  private final AtomicDouble oversubscriptionProportion = new 
AtomicDouble(0.0);
+
+  // EWMA tracking
+  private final double emaAlpha;
+  private volatile double utilizationEma = 0.0;
+  private final AtomicLong lastIntervalConsumed;
+
+  FeedbackAdaptiveRateLimiter(long refillInterval, double 
backoffMultiplierIncrement,
+    double backoffMultiplierDecrement, double maxBackoffMultiplier,
+    double oversubscriptionIncrement, double oversubscriptionDecrement, double 
maxOversubscription,
+    double utilizationErrorBudget) {
+    super();
+    Preconditions.checkArgument(getTimeUnitInMillis() >= refillInterval, 
String.format(
+      "Refill interval %s must be ≤ TimeUnit millis %s", refillInterval, 
getTimeUnitInMillis()));
+
+    Preconditions.checkArgument(backoffMultiplierIncrement > 0.0,
+      String.format("Backoff multiplier increment %s must be > 0.0", 
backoffMultiplierIncrement));
+    Preconditions.checkArgument(backoffMultiplierDecrement > 0.0,
+      String.format("Backoff multiplier decrement %s must be > 0.0", 
backoffMultiplierDecrement));
+    Preconditions.checkArgument(maxBackoffMultiplier > 1.0,
+      String.format("Max backoff multiplier %s must be > 1.0", 
maxBackoffMultiplier));
+    Preconditions.checkArgument(utilizationErrorBudget > 0.0 && 
utilizationErrorBudget <= 1.0,
+      String.format("Utilization error budget %s must be between 0.0 and 1.0",
+        utilizationErrorBudget));
+
+    this.refillInterval = refillInterval;
+    this.backoffMultiplierIncrement = backoffMultiplierIncrement;
+    this.backoffMultiplierDecrement = backoffMultiplierDecrement;
+    this.maxBackoffMultiplier = maxBackoffMultiplier;
+    this.oversubscriptionIncrement = oversubscriptionIncrement;
+    this.oversubscriptionDecrement = oversubscriptionDecrement;
+    this.maxOversubscription = maxOversubscription;
+    this.minTargetUtilization = 1.0 - utilizationErrorBudget;
+    this.maxTargetUtilization = 1.0 + utilizationErrorBudget;
+
+    this.emaAlpha = refillInterval / (double) (WINDOW_TIME_MS + 
refillInterval);
+    this.lastIntervalConsumed = new AtomicLong(0);
+  }
+
+  @Override
+  public long refill(long limit) {
+    final long now = EnvironmentEdgeManager.currentTime();
+    if (nextRefillTime == -1) {
+      nextRefillTime = now + refillInterval;
+      hadContentionThisInterval = false;
+      return getOversubscribedLimit(limit);
+    }
+    if (now < nextRefillTime) {
+      return 0;
+    }
+    long diff = refillInterval + now - nextRefillTime;
+    long refills = diff / refillInterval;
+    nextRefillTime = now + refillInterval;
+
+    long intendedUsage = getRefillIntervalAdjustedLimit(limit);
+    if (intendedUsage > 0) {
+      long consumed = lastIntervalConsumed.get();
+      if (consumed > 0) {
+        double util = (double) consumed / intendedUsage;
+        utilizationEma = emaAlpha * util + (1.0 - emaAlpha) * utilizationEma;
+      }
+    }
+
+    if (hadContentionThisInterval) {
+      currentBackoffMultiplier.set(Math
+        .min(currentBackoffMultiplier.get() + backoffMultiplierIncrement, 
maxBackoffMultiplier));
+    } else {
+      currentBackoffMultiplier
+        .set(Math.max(currentBackoffMultiplier.get() - 
backoffMultiplierDecrement, 1.0));
+    }
+
+    double avgUtil = utilizationEma;
+    if (avgUtil < minTargetUtilization) {
+      oversubscriptionProportion.set(Math
+        .min(oversubscriptionProportion.get() + oversubscriptionIncrement, 
maxOversubscription));
+    } else if (avgUtil >= maxTargetUtilization) {
+      oversubscriptionProportion
+        .set(Math.max(oversubscriptionProportion.get() - 
oversubscriptionDecrement, 0.0));
+    }
+
+    hadContentionThisInterval = false;
+    lastIntervalConsumed.set(0);
+
+    long refillAmount = refills * getRefillIntervalAdjustedLimit(limit);
+    long maxRefill = getOversubscribedLimit(limit);
+    return Math.min(maxRefill, refillAmount);
+  }
+
+  private long getOversubscribedLimit(long limit) {
+    return limit + (long) (limit * oversubscriptionProportion.get());
+  }
+
+  @Override
+  public void consume(long amount) {
+    super.consume(amount);
+    lastIntervalConsumed.addAndGet(amount);
+  }
+
+  @Override
+  public long getWaitInterval(long limit, long available, long amount) {
+    limit = getRefillIntervalAdjustedLimit(limit);
+    if (nextRefillTime == -1) {
+      return 0;
+    }
+
+    final long now = EnvironmentEdgeManager.currentTime();
+    final long refillTime = nextRefillTime;
+    long diff = amount - available;
+    if (diff > 0) {
+      hadContentionThisInterval = true;
+    }
+
+    long nextInterval = refillTime - now;
+    if (diff <= limit) {
+      return applyBackoffMultiplier(nextInterval);
+    }
+
+    long extra = diff / limit;
+    if (diff % limit == 0) {
+      extra--;
+    }
+    long baseWait = nextInterval + (extra * refillInterval);
+    return applyBackoffMultiplier(baseWait);
+  }
+
+  private long getRefillIntervalAdjustedLimit(long limit) {
+    return (long) Math.ceil(refillInterval / (double) getTimeUnitInMillis() * 
limit);
+  }
+
+  private long applyBackoffMultiplier(long baseWaitInterval) {
+    return (long) (baseWaitInterval * currentBackoffMultiplier.get());
+  }
+
+  // strictly for testing
+  @Override
+  public void setNextRefillTime(long nextRefillTime) {
+    this.nextRefillTime = nextRefillTime;
+  }
+
+  @Override
+  public long getNextRefillTime() {
+    return this.nextRefillTime;
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
index 43dfab703b7..38d171f1bf9 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
@@ -46,11 +46,10 @@ public class TimeBasedLimiter implements QuotaLimiter {
   private RateLimiter reqHandlerUsageTimeLimiter = null;
 
   private TimeBasedLimiter(Configuration conf) {
-    if (
-      FixedIntervalRateLimiter.class.getName().equals(
-        conf.getClass(RateLimiter.QUOTA_RATE_LIMITER_CONF_KEY, 
AverageIntervalRateLimiter.class)
-          .getName())
-    ) {
+    String limiterClassName =
+      conf.getClass(RateLimiter.QUOTA_RATE_LIMITER_CONF_KEY, 
AverageIntervalRateLimiter.class)
+        .getName();
+    if (FixedIntervalRateLimiter.class.getName().equals(limiterClassName)) {
       long refillInterval = 
conf.getLong(FixedIntervalRateLimiter.RATE_LIMITER_REFILL_INTERVAL_MS,
         RateLimiter.DEFAULT_TIME_UNIT);
       reqsLimiter = new FixedIntervalRateLimiter(refillInterval);
@@ -66,6 +65,22 @@ public class TimeBasedLimiter implements QuotaLimiter {
       atomicReadSizeLimiter = new FixedIntervalRateLimiter(refillInterval);
       atomicWriteSizeLimiter = new FixedIntervalRateLimiter(refillInterval);
       reqHandlerUsageTimeLimiter = new 
FixedIntervalRateLimiter(refillInterval);
+    } else if 
(FeedbackAdaptiveRateLimiter.class.getName().equals(limiterClassName)) {
+      FeedbackAdaptiveRateLimiter.FeedbackAdaptiveRateLimiterFactory 
feedbackLimiterFactory =
+        new 
FeedbackAdaptiveRateLimiter.FeedbackAdaptiveRateLimiterFactory(conf);
+      reqsLimiter = feedbackLimiterFactory.create();
+      reqSizeLimiter = feedbackLimiterFactory.create();
+      writeReqsLimiter = feedbackLimiterFactory.create();
+      writeSizeLimiter = feedbackLimiterFactory.create();
+      readReqsLimiter = feedbackLimiterFactory.create();
+      readSizeLimiter = feedbackLimiterFactory.create();
+      reqCapacityUnitLimiter = feedbackLimiterFactory.create();
+      writeCapacityUnitLimiter = feedbackLimiterFactory.create();
+      readCapacityUnitLimiter = feedbackLimiterFactory.create();
+      atomicReqLimiter = feedbackLimiterFactory.create();
+      atomicReadSizeLimiter = feedbackLimiterFactory.create();
+      atomicWriteSizeLimiter = feedbackLimiterFactory.create();
+      reqHandlerUsageTimeLimiter = feedbackLimiterFactory.create();
     } else {
       reqsLimiter = new AverageIntervalRateLimiter();
       reqSizeLimiter = new AverageIntervalRateLimiter();
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFeedbackAdaptiveRateLimiter.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFeedbackAdaptiveRateLimiter.java
new file mode 100644
index 00000000000..a6a233205c8
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestFeedbackAdaptiveRateLimiter.java
@@ -0,0 +1,542 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Verify the behavior of the FeedbackAdaptiveRateLimiter including adaptive 
backoff multipliers and
+ * over-subscription functionality.
+ */
+@Category({ RegionServerTests.class, SmallTests.class })
+public class TestFeedbackAdaptiveRateLimiter {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestFeedbackAdaptiveRateLimiter.class);
+
+  private ManualEnvironmentEdge testEdge;
+  private FeedbackAdaptiveRateLimiter.FeedbackAdaptiveRateLimiterFactory 
factory;
+
+  @Before
+  public void setUp() {
+    testEdge = new ManualEnvironmentEdge();
+    EnvironmentEdgeManager.injectEdge(testEdge);
+
+    Configuration conf = HBaseConfiguration.create();
+    // Set refill interval for testing
+    conf.setLong(FixedIntervalRateLimiter.RATE_LIMITER_REFILL_INTERVAL_MS, 
500);
+    // Configure adaptive parameters for testing - using larger values than 
defaults for
+    // observability
+    
conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_INCREMENT,
 0.1);
+    
conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_BACKOFF_MULTIPLIER_DECREMENT,
+      0.05);
+    
conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_MAX_BACKOFF_MULTIPLIER,
 3.0);
+    
conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_INCREMENT,
 0.01);
+    
conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_OVERSUBSCRIPTION_DECREMENT,
 0.005);
+    
conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_MAX_OVERSUBSCRIPTION,
 0.2);
+    
conf.setDouble(FeedbackAdaptiveRateLimiter.FEEDBACK_ADAPTIVE_UTILIZATION_ERROR_BUDGET,
 0.1);
+
+    factory = new 
FeedbackAdaptiveRateLimiter.FeedbackAdaptiveRateLimiterFactory(conf);
+  }
+
+  @After
+  public void tearDown() {
+    EnvironmentEdgeManager.reset();
+  }
+
+  @Test
+  public void testBasicFunctionality() {
+    FeedbackAdaptiveRateLimiter limiter = factory.create();
+    limiter.set(10, TimeUnit.SECONDS);
+
+    // Initially should work like normal rate limiter
+    assertEquals(0, limiter.getWaitIntervalMs());
+    limiter.consume(5);
+    assertEquals(0, limiter.getWaitIntervalMs());
+    limiter.consume(5);
+
+    // Should need to wait after consuming full limit
+    assertTrue(limiter.getWaitIntervalMs() > 0);
+  }
+
+  @Test
+  public void testAdaptiveBackoffIncreases() {
+    FeedbackAdaptiveRateLimiter limiter = factory.create();
+    limiter.set(10, TimeUnit.SECONDS);
+
+    testEdge.setValue(1000);
+    limiter.refill(10);
+
+    // Record initial wait interval
+    limiter.consume(10);
+    long initialWaitInterval = limiter.getWaitInterval(10, 0, 1);
+    assertTrue("Initial wait interval should be positive", initialWaitInterval 
> 0);
+
+    // Create sustained contention over multiple intervals to increase backoff
+    for (int i = 0; i < 5; i++) {
+      testEdge.setValue(1000 + (i + 1) * 500);
+      limiter.refill(10);
+      limiter.consume(10);
+      // Create contention by asking for more than available
+      limiter.getWaitInterval(10, 0, 1);
+    }
+
+    // After contention, wait interval should increase due to backoff 
multiplier
+    testEdge.setValue(4000);
+    limiter.refill(10);
+    limiter.consume(10);
+    long increasedWaitInterval = limiter.getWaitInterval(10, 0, 1);
+
+    // With backoffMultiplierIncrement=0.1 and 5 intervals of contention,
+    // multiplier should be around 1.5, so wait should be significantly higher
+    assertTrue(
+      "Wait interval should increase with contention. Initial: " + 
initialWaitInterval
+        + ", After contention: " + increasedWaitInterval,
+      increasedWaitInterval > initialWaitInterval * 1.3);
+  }
+
+  @Test
+  public void testAdaptiveBackoffDecreases() {
+    FeedbackAdaptiveRateLimiter limiter = factory.create();
+    limiter.set(10, TimeUnit.SECONDS);
+
+    testEdge.setValue(1000);
+    limiter.refill(10);
+
+    // Build up contention to increase backoff multiplier
+    for (int i = 0; i < 5; i++) {
+      limiter.consume(10);
+      limiter.getWaitInterval(10, 0, 1); // Create contention
+      testEdge.setValue(1000 + (i + 1) * 500);
+      limiter.refill(10);
+    }
+
+    // Measure wait interval with elevated backoff
+    limiter.consume(10);
+    long elevatedWaitInterval = limiter.getWaitInterval(10, 0, 1);
+
+    // Run several intervals without contention to decrease backoff
+    for (int i = 0; i < 10; i++) {
+      testEdge.setValue(4000 + i * 500);
+      limiter.refill(10);
+      // Consume less than available - no contention
+      limiter.consume(3);
+    }
+
+    // Measure wait interval after backoff reduction
+    testEdge.setValue(9500);
+    limiter.refill(10);
+    limiter.consume(10);
+    long reducedWaitInterval = limiter.getWaitInterval(10, 0, 1);
+
+    // After 10 intervals without contention (decrement=0.05 each),
+    // multiplier should decrease by ~0.5, making wait interval lower
+    assertTrue("Wait interval should decrease without contention. Elevated: " 
+ elevatedWaitInterval
+      + ", Reduced: " + reducedWaitInterval, reducedWaitInterval < 
elevatedWaitInterval * 0.9);
+  }
+
+  @Test
+  public void testOversubscriptionIncreasesWithLowUtilization() {
+    FeedbackAdaptiveRateLimiter limiter = factory.create();
+    limiter.set(10, TimeUnit.SECONDS);
+
+    testEdge.setValue(1000);
+
+    // Initial refill to set up the limiter
+    long initialRefill = limiter.refill(10);
+    assertEquals("Initial refill should match limit", 10, initialRefill);
+
+    // Create low utilization scenario (consuming much less than available)
+    // With error budget of 0.1, min target utilization is 0.9
+    // We'll consume only ~40% to trigger oversubscription increase
+    // Refill interval adjusted limit is 5 (500ms / 1000ms * 10)
+    for (int i = 0; i < 30; i++) {
+      // Consume before advancing time so utilization is tracked
+      limiter.consume(2); // 2 out of 5 = 40% utilization
+      testEdge.setValue(1000 + (i + 1) * 500);
+      limiter.refill(10);
+    }
+
+    // After many intervals of low utilization, oversubscription should have 
increased
+    // Now test that the oversubscription proportion actually affects refill 
behavior
+    // Consume all available to start fresh
+    limiter.consume((int) limiter.getAvailable());
+
+    // Jump forward by 3 refill intervals (1500ms)
+    // This tests that refill can return more than the base limit due to 
oversubscription
+    testEdge.setValue(16000 + 1500);
+    long multiIntervalRefill = limiter.refill(10);
+
+    // With oversubscription at max (0.2), the oversubscribed limit is 10 * 
1.2 = 12
+    // With 3 intervals: refillAmount = 3 * 5 = 15
+    // Result = min(12, 15) = 12, which exceeds the base limit of 10
+    // Without oversubscription, this would be capped at min(10, 15) = 10
+    assertTrue("With oversubscription from low utilization, refill should 
exceed base limit. Got: "
+      + multiIntervalRefill, multiIntervalRefill > 10);
+  }
+
+  @Test
+  public void testOversubscriptionDecreasesWithHighUtilization() {
+    FeedbackAdaptiveRateLimiter limiter = factory.create();
+    limiter.set(10, TimeUnit.SECONDS);
+
+    testEdge.setValue(1000);
+
+    // First, build up oversubscription with low utilization
+    limiter.refill(10);
+    for (int i = 0; i < 15; i++) {
+      testEdge.setValue(1000 + (i + 1) * 500);
+      limiter.refill(10);
+      limiter.consume(2); // Low utilization
+    }
+
+    // Now create high utilization scenario (consuming more than target)
+    // With error budget of 0.1, max target utilization is 1.1
+    // We'll consume close to the full interval-adjusted limit to trigger 
decrease
+    for (int i = 0; i < 10; i++) {
+      testEdge.setValue(8500 + (i + 1) * 500);
+      long refilled = limiter.refill(10);
+      // Consume full amount to show high utilization
+      limiter.consume((int) refilled);
+    }
+
+    // After intervals of high utilization, oversubscription should decrease
+    testEdge.setValue(14000);
+    long refillAfterHighUtil = limiter.refill(10);
+
+    // Oversubscription should have decreased, so refill should be closer to 
base limit
+    // With oversubscriptionDecrement=0.005 over 10 intervals, it should drop 
by ~0.05
+    assertTrue(
+      "Refill should be closer to base after high utilization. Got: " + 
refillAfterHighUtil,
+      refillAfterHighUtil <= 6);
+  }
+
+  @Test
+  public void testBackoffMultiplierCapsAtMaximum() {
+    FeedbackAdaptiveRateLimiter limiter = factory.create();
+    limiter.set(10, TimeUnit.SECONDS);
+
+    testEdge.setValue(1000);
+    limiter.refill(10);
+
+    // Record base wait interval
+    limiter.consume(10);
+    long baseWaitInterval = limiter.getWaitInterval(10, 0, 1);
+
+    // Create extreme sustained contention to push backoff to max
+    // With increment=0.1 and max=3.0, we need (3.0-1.0)/0.1 = 20 intervals
+    for (int i = 0; i < 25; i++) {
+      testEdge.setValue(1000 + (i + 1) * 500);
+      limiter.refill(10);
+      limiter.consume(10);
+      limiter.getWaitInterval(10, 0, 1); // Create contention
+    }
+
+    // Measure wait at maximum backoff
+    testEdge.setValue(14000);
+    limiter.refill(10);
+    limiter.consume(10);
+    long maxBackoffWaitInterval = limiter.getWaitInterval(10, 0, 1);
+
+    // Wait interval should be approximately 3x base (max multiplier)
+    assertTrue(
+      "Wait interval should cap at max multiplier. Base: " + baseWaitInterval 
+ ", Max backoff: "
+        + maxBackoffWaitInterval,
+      maxBackoffWaitInterval >= baseWaitInterval * 2.5
+        && maxBackoffWaitInterval <= baseWaitInterval * 3.5);
+
+    // Additional contention should not increase wait further
+    testEdge.setValue(14500);
+    limiter.refill(10);
+    limiter.consume(10);
+    limiter.getWaitInterval(10, 0, 1);
+
+    testEdge.setValue(15000);
+    limiter.refill(10);
+    limiter.consume(10);
+    long stillMaxWaitInterval = limiter.getWaitInterval(10, 0, 1);
+
+    // Should still be at max, not increasing further
+    assertTrue(
+      "Wait should remain capped. Previous: " + maxBackoffWaitInterval + ", 
Current: "
+        + stillMaxWaitInterval,
+      Math.abs(stillMaxWaitInterval - maxBackoffWaitInterval) < 
baseWaitInterval * 0.2);
+  }
+
+  @Test
+  public void testOversubscriptionCapsAtMaximum() {
+    FeedbackAdaptiveRateLimiter limiter = factory.create();
+    limiter.set(10, TimeUnit.SECONDS);
+
+    testEdge.setValue(1000);
+    limiter.refill(10);
+
+    // Create extreme low utilization to push oversubscription to max
+    // With increment=0.01 and max=0.2, we need 0.2/0.01 = 20 intervals
+    for (int i = 0; i < 25; i++) {
+      testEdge.setValue(1000 + (i + 1) * 500);
+      limiter.refill(10);
+      // Very low consumption to maximize oversubscription increase
+      limiter.consume(1);
+    }
+
+    // Check that refill is capped at max oversubscription
+    testEdge.setValue(14000);
+    long refillWithMaxOversubscription = limiter.refill(10);
+
+    // With max oversubscription of 0.2, refill should be at most 5 * 1.2 = 6
+    // (5 is the interval-adjusted limit for 500ms refill interval)
+    assertTrue("Refill should cap at max oversubscription. Got: " + 
refillWithMaxOversubscription,
+      refillWithMaxOversubscription <= 7);
+
+    // Further low utilization should not increase refill
+    testEdge.setValue(14500);
+    limiter.refill(10);
+    limiter.consume(1);
+
+    testEdge.setValue(15000);
+    long stillMaxRefill = limiter.refill(10);
+
+    // Should remain at cap
+    assertEquals("Refill should remain at max oversubscription", 
refillWithMaxOversubscription,
+      stillMaxRefill);
+  }
+
+  @Test
+  public void testMultipleRefillIntervals() {
+    FeedbackAdaptiveRateLimiter limiter = factory.create();
+    limiter.set(10, TimeUnit.SECONDS);
+
+    testEdge.setValue(1000);
+    limiter.refill(10);
+    limiter.consume(10);
+
+    // Jump forward by multiple refill intervals (3 intervals = 1500ms)
+    testEdge.setValue(1000 + 1500);
+
+    // Should refill 3 intervals worth, but capped at oversubscribed limit
+    long multiIntervalRefill = limiter.refill(10);
+
+    // With 500ms refill interval, each interval gives 5 resources
+    // 3 intervals = 15, but capped at limit (no oversubscription yet) = 10
+    assertTrue("Multiple interval refill should provide multiple refill 
amounts. Got: "
+      + multiIntervalRefill, multiIntervalRefill >= 10);
+  }
+
+  @Test
+  public void testRefillIntervalAdjustment() {
+    FeedbackAdaptiveRateLimiter limiter = factory.create();
+    limiter.set(10, TimeUnit.SECONDS);
+
+    testEdge.setValue(1000);
+
+    // First refill should give full limit
+    long firstRefill = limiter.refill(10);
+    assertEquals("First refill should give full limit", 10, firstRefill);
+
+    limiter.consume(10);
+
+    // After exactly one refill interval (500ms), should get interval-adjusted 
amount
+    testEdge.setValue(1000 + 500);
+    long adjustedRefill = limiter.refill(10);
+
+    // 500ms is half of 1000ms time unit, so should get half the limit = 5
+    assertEquals("Refill after one interval should be interval-adjusted", 5, 
adjustedRefill);
+  }
+
+  @Test
+  public void testBackoffMultiplierBottomsAtOne() {
+    FeedbackAdaptiveRateLimiter limiter = factory.create();
+    limiter.set(10, TimeUnit.SECONDS);
+
+    testEdge.setValue(1000);
+    limiter.refill(10);
+
+    // Record baseline wait with no backoff applied
+    limiter.consume(10);
+    long baselineWait = limiter.getWaitInterval(10, 0, 1);
+
+    // Run many intervals without contention to ensure multiplier stays at 1.0
+    for (int i = 0; i < 20; i++) {
+      testEdge.setValue(1000 + (i + 1) * 500);
+      limiter.refill(10);
+      limiter.consume(3); // No contention
+    }
+
+    // Wait interval should still be at baseline (multiplier = 1.0)
+    testEdge.setValue(11500);
+    limiter.refill(10);
+    limiter.consume(10);
+    long noContentionWait = limiter.getWaitInterval(10, 0, 1);
+
+    assertEquals("Wait interval should not go below baseline 
(multiplier=1.0)", baselineWait,
+      noContentionWait);
+  }
+
+  @Test
+  public void testConcurrentAccess() throws InterruptedException {
+    FeedbackAdaptiveRateLimiter limiter = factory.create();
+    limiter.set(100, TimeUnit.SECONDS);
+
+    testEdge.setValue(1000);
+    limiter.refill(100);
+
+    // Simulate concurrent access
+    Thread[] threads = new Thread[10];
+    for (int i = 0; i < threads.length; i++) {
+      threads[i] = new Thread(() -> {
+        for (int j = 0; j < 10; j++) {
+          limiter.consume(1);
+          limiter.getWaitInterval(100, 50, 1);
+        }
+      });
+    }
+
+    for (Thread thread : threads) {
+      thread.start();
+    }
+
+    for (Thread thread : threads) {
+      thread.join();
+    }
+
+    // Should complete without exceptions - basic thread safety verification
+    assertTrue("Concurrent access should complete successfully", true);
+  }
+
+  @Test
+  public void testOverconsumptionBehavior() {
+    FeedbackAdaptiveRateLimiter limiter = factory.create();
+    limiter.set(10, TimeUnit.SECONDS);
+
+    testEdge.setValue(1000);
+    limiter.refill(10);
+
+    // Over-consume significantly
+    limiter.consume(20);
+
+    // Should require waiting for multiple intervals (500ms refill interval)
+    long waitInterval = limiter.getWaitInterval(10, -10, 1);
+    assertTrue("Should require substantial wait after over-consumption", 
waitInterval >= 500);
+  }
+
+  @Test
+  public void testOscillatingLoadPattern() {
+    FeedbackAdaptiveRateLimiter limiter = factory.create();
+    limiter.set(10, TimeUnit.SECONDS);
+
+    testEdge.setValue(1000);
+    limiter.refill(10);
+
+    // Oscillate between high contention and low contention
+    for (int cycle = 0; cycle < 3; cycle++) {
+      // High contention phase - increase backoff
+      for (int i = 0; i < 3; i++) {
+        testEdge.setValue(1000 + (cycle * 3000) + (i * 500));
+        limiter.refill(10);
+        limiter.consume(10);
+        limiter.getWaitInterval(10, 0, 1); // Create contention
+      }
+
+      long highContentionWait = limiter.getWaitInterval(10, 0, 1);
+
+      // Low contention phase - decrease backoff
+      for (int i = 0; i < 3; i++) {
+        testEdge.setValue(1000 + (cycle * 3000) + 1500 + (i * 500));
+        limiter.refill(10);
+        limiter.consume(3); // No contention
+      }
+
+      testEdge.setValue(1000 + (cycle * 3000) + 3000);
+      limiter.refill(10);
+      limiter.consume(10);
+      long lowContentionWait = limiter.getWaitInterval(10, 0, 1);
+
+      // After low contention phase, wait should be lower than after high 
contention
+      assertTrue(
+        "Wait should decrease after low contention phase in cycle " + cycle + 
". High: "
+          + highContentionWait + ", Low: " + lowContentionWait,
+        lowContentionWait < highContentionWait);
+    }
+  }
+
+  @Test
+  public void testUtilizationEmaConvergence() {
+    FeedbackAdaptiveRateLimiter limiter = factory.create();
+    limiter.set(10, TimeUnit.SECONDS);
+
+    testEdge.setValue(1000);
+    limiter.refill(10);
+
+    // Consistently consume at 80% utilization
+    for (int i = 0; i < 30; i++) {
+      testEdge.setValue(1000 + (i + 1) * 500);
+      limiter.refill(10);
+      limiter.consume(4); // 4 out of 5 interval-adjusted = 80%
+    }
+
+    // After many intervals, oversubscription should stabilize
+    // At 80% utilization (below 90% target), oversubscription should increase
+    testEdge.setValue(16500);
+    limiter.refill(10);
+
+    // Now switch to 100% utilization
+    for (int i = 0; i < 30; i++) {
+      testEdge.setValue(16500 + (i + 1) * 500);
+      long refilled = limiter.refill(10);
+      limiter.consume((int) refilled); // Consume everything
+    }
+
+    // At 100% utilization (within target range), oversubscription should 
stabilize
+    testEdge.setValue(32000);
+    limiter.refill(10);
+
+    // The EMA should have adjusted, and refills should be different
+    // (though exact values depend on EMA convergence rate)
+    assertTrue("Refill behavior should adapt to utilization patterns", true);
+  }
+
+  private static final class ManualEnvironmentEdge implements EnvironmentEdge {
+    private long currentTime = 1000;
+
+    public void setValue(long time) {
+      this.currentTime = time;
+    }
+
+    @Override
+    public long currentTime() {
+      return currentTime;
+    }
+  }
+}


Reply via email to