This is an automated email from the ASF dual-hosted git repository.

ndimiduk pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 794bf1ee97d HBASE-28672 Ensure large batches are not indefinitely 
blocked by quotas (#6003)
794bf1ee97d is described below

commit 794bf1ee97d58283481967a93c35cf528e457178
Author: Ray Mattingly <rmdmattin...@gmail.com>
AuthorDate: Mon Jul 8 19:28:44 2024 -0400

    HBASE-28672 Ensure large batches are not indefinitely blocked by quotas 
(#6003)
    
    Co-authored-by: Ray Mattingly <rmattin...@hubspot.com>
    Signed-off-by: Bryan Beaudreault < bbeaudrea...@apache.org>
    Signed-off-by: Nick Dimiduk <ndimi...@apache.org>
---
 .../hadoop/hbase/quotas/DefaultOperationQuota.java |  12 +-
 .../hadoop/hbase/quotas/NoopQuotaLimiter.java      |  20 +++
 .../apache/hadoop/hbase/quotas/QuotaLimiter.java   |  13 ++
 .../hadoop/hbase/quotas/TimeBasedLimiter.java      |  26 ++++
 .../hbase/quotas/TestDefaultOperationQuota.java    | 167 +++++++++++++++++++++
 5 files changed, 236 insertions(+), 2 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
index 2e26765a6a1..a387c04e4e5 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
@@ -111,8 +111,16 @@ public class DefaultOperationQuota implements 
OperationQuota {
         continue;
       }
 
-      limiter.checkQuota(numWrites, writeConsumed, numReads, readConsumed,
-        writeCapacityUnitConsumed, readCapacityUnitConsumed);
+      long maxRequestsToEstimate = limiter.getRequestNumLimit();
+      long maxReadsToEstimate = Math.min(maxRequestsToEstimate, 
limiter.getReadNumLimit());
+      long maxWritesToEstimate = Math.min(maxRequestsToEstimate, 
limiter.getWriteNumLimit());
+      long maxReadSizeToEstimate = Math.min(readConsumed, 
limiter.getReadLimit());
+      long maxWriteSizeToEstimate = Math.min(writeConsumed, 
limiter.getWriteLimit());
+
+      limiter.checkQuota(Math.min(maxWritesToEstimate, numWrites),
+        Math.min(maxWriteSizeToEstimate, writeConsumed), 
Math.min(maxReadsToEstimate, numReads),
+        Math.min(maxReadSizeToEstimate, readConsumed), 
writeCapacityUnitConsumed,
+        readCapacityUnitConsumed);
       readAvailable = Math.min(readAvailable, limiter.getReadAvailable());
     }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
index cf1e49c12e5..5ece0be2b5a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java
@@ -65,6 +65,21 @@ class NoopQuotaLimiter implements QuotaLimiter {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public long getRequestNumLimit() {
+    return Long.MAX_VALUE;
+  }
+
+  @Override
+  public long getReadNumLimit() {
+    return Long.MAX_VALUE;
+  }
+
+  @Override
+  public long getWriteNumLimit() {
+    return Long.MAX_VALUE;
+  }
+
   @Override
   public long getReadAvailable() {
     throw new UnsupportedOperationException();
@@ -75,6 +90,11 @@ class NoopQuotaLimiter implements QuotaLimiter {
     return Long.MAX_VALUE;
   }
 
+  @Override
+  public long getWriteLimit() {
+    return Long.MAX_VALUE;
+  }
+
   @Override
   public String toString() {
     return "NoopQuotaLimiter";
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
index 8d00a702e25..12e4c4a7c6a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java
@@ -79,6 +79,19 @@ public interface QuotaLimiter {
   /** Returns the maximum number of bytes ever available to read */
   long getReadLimit();
 
+  /** Returns the maximum number of bytes ever available to write */
+  long getWriteLimit();
+
   /** Returns the number of bytes available to write to avoid exceeding the 
quota */
   long getWriteAvailable();
+
+  /** Returns the maximum number of requests to allow per TimeUnit */
+  long getRequestNumLimit();
+
+  /** Returns the maximum number of reads to allow per TimeUnit */
+  long getReadNumLimit();
+
+  /** Returns the maximum number of writes to allow per TimeUnit */
+  long getWriteNumLimit();
+
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
index e6e143343f7..f5170b09c83 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
@@ -240,6 +240,27 @@ public class TimeBasedLimiter implements QuotaLimiter {
     return writeSizeLimiter.getAvailable();
   }
 
+  @Override
+  public long getRequestNumLimit() {
+    long readAndWriteLimit = readReqsLimiter.getLimit() + 
writeReqsLimiter.getLimit();
+
+    if (readAndWriteLimit < 0) { // handle overflow
+      readAndWriteLimit = Long.MAX_VALUE;
+    }
+
+    return Math.min(reqsLimiter.getLimit(), readAndWriteLimit);
+  }
+
+  @Override
+  public long getReadNumLimit() {
+    return readReqsLimiter.getLimit();
+  }
+
+  @Override
+  public long getWriteNumLimit() {
+    return writeReqsLimiter.getLimit();
+  }
+
   @Override
   public long getReadAvailable() {
     return readSizeLimiter.getAvailable();
@@ -250,6 +271,11 @@ public class TimeBasedLimiter implements QuotaLimiter {
     return Math.min(readSizeLimiter.getLimit(), reqSizeLimiter.getLimit());
   }
 
+  @Override
+  public long getWriteLimit() {
+    return Math.min(writeSizeLimiter.getLimit(), reqSizeLimiter.getLimit());
+  }
+
   @Override
   public String toString() {
     StringBuilder builder = new StringBuilder();
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java
index 4684be02d69..a6b7ba6fee5 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java
@@ -18,21 +18,37 @@
 package org.apache.hadoop.hbase.quotas;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
+
 @Category({ RegionServerTests.class, SmallTests.class })
 public class TestDefaultOperationQuota {
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestDefaultOperationQuota.class);
 
+  private static ManualEnvironmentEdge envEdge = new ManualEnvironmentEdge();
+  static {
+    envEdge.setValue(EnvironmentEdgeManager.currentTime());
+    // only active the envEdge for quotas package
+    EnvironmentEdgeManagerTestHelper.injectEdgeForPackage(envEdge,
+      ThrottleQuotaTestUtil.class.getPackage().getName());
+  }
+
   @Test
   public void testScanEstimateNewScanner() {
     long blockSize = 64 * 1024;
@@ -125,4 +141,155 @@ public class TestDefaultOperationQuota {
     // shrinking workload should only shrink estimate to maxBBS
     assertEquals(maxBlockBytesScanned, estimate);
   }
+
+  @Test
+  public void testLargeBatchSaturatesReadNumLimit()
+    throws RpcThrottlingException, InterruptedException {
+    int limit = 10;
+    QuotaProtos.Throttle throttle =
+      
QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder()
+        
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
+    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
+    DefaultOperationQuota quota = new DefaultOperationQuota(new 
Configuration(), 65536, limiter);
+
+    // use the whole limit
+    quota.checkBatchQuota(0, limit);
+
+    // the next request should be rejected
+    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 
1));
+
+    envEdge.incValue(1000);
+    // after the TimeUnit, the limit should be refilled
+    quota.checkBatchQuota(0, limit);
+  }
+
+  @Test
+  public void testLargeBatchSaturatesReadWriteLimit()
+    throws RpcThrottlingException, InterruptedException {
+    int limit = 10;
+    QuotaProtos.Throttle throttle =
+      
QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder()
+        
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
+    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
+    DefaultOperationQuota quota = new DefaultOperationQuota(new 
Configuration(), 65536, limiter);
+
+    // use the whole limit
+    quota.checkBatchQuota(limit, 0);
+
+    // the next request should be rejected
+    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 
0));
+
+    envEdge.incValue(1000);
+    // after the TimeUnit, the limit should be refilled
+    quota.checkBatchQuota(limit, 0);
+  }
+
+  @Test
+  public void testTooLargeReadBatchIsNotBlocked()
+    throws RpcThrottlingException, InterruptedException {
+    int limit = 10;
+    QuotaProtos.Throttle throttle =
+      
QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder()
+        
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
+    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
+    DefaultOperationQuota quota = new DefaultOperationQuota(new 
Configuration(), 65536, limiter);
+
+    // use more than the limit, which should succeed rather than being 
indefinitely blocked
+    quota.checkBatchQuota(0, 10 + limit);
+
+    // the next request should be blocked
+    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 
1));
+
+    envEdge.incValue(1000);
+    // even after the TimeUnit, the limit should not be refilled because we 
oversubscribed
+    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 
limit));
+  }
+
+  @Test
+  public void testTooLargeWriteBatchIsNotBlocked()
+    throws RpcThrottlingException, InterruptedException {
+    int limit = 10;
+    QuotaProtos.Throttle throttle =
+      
QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder()
+        
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
+    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
+    DefaultOperationQuota quota = new DefaultOperationQuota(new 
Configuration(), 65536, limiter);
+
+    // use more than the limit, which should succeed rather than being 
indefinitely blocked
+    quota.checkBatchQuota(10 + limit, 0);
+
+    // the next request should be blocked
+    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 
0));
+
+    envEdge.incValue(1000);
+    // even after the TimeUnit, the limit should not be refilled because we 
oversubscribed
+    assertThrows(RpcThrottlingException.class, () -> 
quota.checkBatchQuota(limit, 0));
+  }
+
+  @Test
+  public void testTooLargeWriteSizeIsNotBlocked()
+    throws RpcThrottlingException, InterruptedException {
+    int limit = 50;
+    QuotaProtos.Throttle throttle =
+      
QuotaProtos.Throttle.newBuilder().setWriteSize(QuotaProtos.TimedQuota.newBuilder()
+        
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
+    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
+    DefaultOperationQuota quota = new DefaultOperationQuota(new 
Configuration(), 65536, limiter);
+
+    // writes are estimated a 100 bytes, so this will use 2x the limit but 
should not be blocked
+    quota.checkBatchQuota(1, 0);
+
+    // the next request should be blocked
+    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 
0));
+
+    envEdge.incValue(1000);
+    // even after the TimeUnit, the limit should not be refilled because we 
oversubscribed
+    assertThrows(RpcThrottlingException.class, () -> 
quota.checkBatchQuota(limit, 0));
+  }
+
+  @Test
+  public void testTooLargeReadSizeIsNotBlocked()
+    throws RpcThrottlingException, InterruptedException {
+    long blockSize = 65536;
+    long limit = blockSize / 2;
+    QuotaProtos.Throttle throttle =
+      
QuotaProtos.Throttle.newBuilder().setReadSize(QuotaProtos.TimedQuota.newBuilder()
+        
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
+    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
+    DefaultOperationQuota quota =
+      new DefaultOperationQuota(new Configuration(), (int) blockSize, limiter);
+
+    // reads are estimated at 1 block each, so this will use ~2x the limit but 
should not be blocked
+    quota.checkBatchQuota(0, 1);
+
+    // the next request should be blocked
+    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 
1));
+
+    envEdge.incValue(1000);
+    // even after the TimeUnit, the limit should not be refilled because we 
oversubscribed
+    assertThrows(RpcThrottlingException.class, () -> 
quota.checkBatchQuota((int) limit, 1));
+  }
+
+  @Test
+  public void testTooLargeRequestSizeIsNotBlocked()
+    throws RpcThrottlingException, InterruptedException {
+    long blockSize = 65536;
+    long limit = blockSize / 2;
+    QuotaProtos.Throttle throttle =
+      
QuotaProtos.Throttle.newBuilder().setReqSize(QuotaProtos.TimedQuota.newBuilder()
+        
.setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build();
+    QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle);
+    DefaultOperationQuota quota =
+      new DefaultOperationQuota(new Configuration(), (int) blockSize, limiter);
+
+    // reads are estimated at 1 block each, so this will use ~2x the limit but 
should not be blocked
+    quota.checkBatchQuota(0, 1);
+
+    // the next request should be blocked
+    assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 
1));
+
+    envEdge.incValue(1000);
+    // even after the TimeUnit, the limit should not be refilled because we 
oversubscribed
+    assertThrows(RpcThrottlingException.class, () -> 
quota.checkBatchQuota((int) limit, 1));
+  }
 }

Reply via email to