This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-3 by this push:
     new 3ea4fef75c2 HBASE-27687 Enhance quotas to consume blockBytesScanned 
rather than response size (#5654)
3ea4fef75c2 is described below

commit 3ea4fef75c25e05df71f178d5fc37f6eec0d3cf3
Author: Ray Mattingly <rmdmattin...@gmail.com>
AuthorDate: Tue Feb 6 15:28:05 2024 -0500

    HBASE-27687 Enhance quotas to consume blockBytesScanned rather than 
response size (#5654)
    
    Signed-off-by: Bryan Beaudreault <bbeaudrea...@apache.org>
---
 .../hadoop/hbase/quotas/DefaultOperationQuota.java |  34 ++-
 .../hadoop/hbase/quotas/ExceedOperationQuota.java  |   6 +-
 .../apache/hadoop/hbase/quotas/OperationQuota.java |  10 +
 .../hbase/quotas/RegionServerRpcQuotaManager.java  |  20 +-
 .../apache/hadoop/hbase/regionserver/HRegion.java  |  10 +
 .../apache/hadoop/hbase/regionserver/Region.java   |   6 +
 .../hbase/quotas/TestBlockBytesScannedQuota.java   | 233 +++++++++++++++++++++
 .../hadoop/hbase/quotas/ThrottleQuotaTestUtil.java |  63 ++++++
 8 files changed, 366 insertions(+), 16 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
index ddf804243ed..4b89e18a802 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java
@@ -22,6 +22,8 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.yetus.audience.InterfaceStability;
 
@@ -49,9 +51,15 @@ public class DefaultOperationQuota implements OperationQuota 
{
   protected long readDiff = 0;
   protected long writeCapacityUnitDiff = 0;
   protected long readCapacityUnitDiff = 0;
+  private boolean useResultSizeBytes;
+  private long blockSizeBytes;
 
-  public DefaultOperationQuota(final Configuration conf, final QuotaLimiter... 
limiters) {
+  public DefaultOperationQuota(final Configuration conf, final int 
blockSizeBytes,
+    final QuotaLimiter... limiters) {
     this(conf, Arrays.asList(limiters));
+    this.useResultSizeBytes =
+      conf.getBoolean(OperationQuota.USE_RESULT_SIZE_BYTES, 
USE_RESULT_SIZE_BYTES_DEFAULT);
+    this.blockSizeBytes = blockSizeBytes;
   }
 
   /**
@@ -94,8 +102,17 @@ public class DefaultOperationQuota implements 
OperationQuota {
   public void close() {
     // Adjust the quota consumed for the specified operation
     writeDiff = operationSize[OperationType.MUTATE.ordinal()] - writeConsumed;
-    readDiff = operationSize[OperationType.GET.ordinal()]
-      + operationSize[OperationType.SCAN.ordinal()] - readConsumed;
+
+    long resultSize =
+      operationSize[OperationType.GET.ordinal()] + 
operationSize[OperationType.SCAN.ordinal()];
+    if (useResultSizeBytes) {
+      readDiff = resultSize - readConsumed;
+    } else {
+      long blockBytesScanned =
+        
RpcServer.getCurrentCall().map(RpcCall::getBlockBytesScanned).orElse(0L);
+      readDiff = Math.max(blockBytesScanned, resultSize) - readConsumed;
+    }
+
     writeCapacityUnitDiff =
       
calculateWriteCapacityUnitDiff(operationSize[OperationType.MUTATE.ordinal()], 
writeConsumed);
     readCapacityUnitDiff = calculateReadCapacityUnitDiff(
@@ -140,8 +157,15 @@ public class DefaultOperationQuota implements 
OperationQuota {
    */
   protected void updateEstimateConsumeQuota(int numWrites, int numReads, int 
numScans) {
     writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100);
-    readConsumed = estimateConsume(OperationType.GET, numReads, 100);
-    readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);
+
+    if (useResultSizeBytes) {
+      readConsumed = estimateConsume(OperationType.GET, numReads, 100);
+      readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000);
+    } else {
+      // assume 1 block required for reads. this is probably a low estimate, 
which is okay
+      readConsumed = numReads > 0 ? blockSizeBytes : 0;
+      readConsumed += numScans > 0 ? blockSizeBytes : 0;
+    }
 
     writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed);
     readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed);
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java
index 1b7200f5f22..1788e550f22 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java
@@ -40,9 +40,9 @@ public class ExceedOperationQuota extends 
DefaultOperationQuota {
   private static final Logger LOG = 
LoggerFactory.getLogger(ExceedOperationQuota.class);
   private QuotaLimiter regionServerLimiter;
 
-  public ExceedOperationQuota(final Configuration conf, QuotaLimiter 
regionServerLimiter,
-    final QuotaLimiter... limiters) {
-    super(conf, limiters);
+  public ExceedOperationQuota(final Configuration conf, int blockSizeBytes,
+    QuotaLimiter regionServerLimiter, final QuotaLimiter... limiters) {
+    super(conf, blockSizeBytes, limiters);
     this.regionServerLimiter = regionServerLimiter;
   }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
index aaae64b6184..e18d3eb3495 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java
@@ -35,6 +35,16 @@ public interface OperationQuota {
     SCAN
   }
 
+  /**
+   * If false, the default, then IO based throttles will consume read 
availability based on the
+   * block bytes scanned by the given request. If true then IO based throttles 
will use result size
+   * rather than block bytes scanned. Using block bytes scanned should be 
preferable to using result
+   * size, because otherwise access patterns like heavily filtered scans may 
be able to produce a
+   * significant and effectively un-throttled workload.
+   */
+  String USE_RESULT_SIZE_BYTES = "hbase.quota.use.result.size.bytes";
+  boolean USE_RESULT_SIZE_BYTES_DEFAULT = false;
+
   /**
    * Checks if it is possible to execute the specified operation. The quota 
will be estimated based
    * on the number of operations to perform and the average size accumulated 
during time.
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
index 4b09c0308f9..de76303e27a 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.ipc.RpcScheduler;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.regionserver.Region;
@@ -113,7 +114,8 @@ public class RegionServerRpcQuotaManager {
    * @param table the table where the operation will be executed
    * @return the OperationQuota
    */
-  public OperationQuota getQuota(final UserGroupInformation ugi, final 
TableName table) {
+  public OperationQuota getQuota(final UserGroupInformation ugi, final 
TableName table,
+    final int blockSizeBytes) {
     if (isQuotaEnabled() && !table.isSystemTable() && isRpcThrottleEnabled()) {
       UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi);
       QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table);
@@ -123,7 +125,8 @@ public class RegionServerRpcQuotaManager {
           LOG.trace("get quota for ugi=" + ugi + " table=" + table + " 
userLimiter=" + userLimiter);
         }
         if (!useNoop) {
-          return new DefaultOperationQuota(this.rsServices.getConfiguration(), 
userLimiter);
+          return new DefaultOperationQuota(this.rsServices.getConfiguration(), 
blockSizeBytes,
+            userLimiter);
         }
       } else {
         QuotaLimiter nsLimiter = 
quotaCache.getNamespaceLimiter(table.getNamespaceAsString());
@@ -139,11 +142,11 @@ public class RegionServerRpcQuotaManager {
         }
         if (!useNoop) {
           if (exceedThrottleQuotaEnabled) {
-            return new 
ExceedOperationQuota(this.rsServices.getConfiguration(), rsLimiter,
-              userLimiter, tableLimiter, nsLimiter);
+            return new 
ExceedOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes,
+              rsLimiter, userLimiter, tableLimiter, nsLimiter);
           } else {
-            return new 
DefaultOperationQuota(this.rsServices.getConfiguration(), userLimiter,
-              tableLimiter, nsLimiter, rsLimiter);
+            return new 
DefaultOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes,
+              userLimiter, tableLimiter, nsLimiter, rsLimiter);
           }
         }
       }
@@ -213,9 +216,10 @@ public class RegionServerRpcQuotaManager {
     } else {
       ugi = User.getCurrent().getUGI();
     }
-    TableName table = region.getTableDescriptor().getTableName();
+    TableDescriptor tableDescriptor = region.getTableDescriptor();
+    TableName table = tableDescriptor.getTableName();
 
-    OperationQuota quota = getQuota(ugi, table);
+    OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes());
     try {
       quota.checkQuota(numWrites, numReads, numScans);
     } catch (RpcThrottlingException e) {
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 0dc96747dd3..ae4045b1216 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -458,6 +458,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
 
   private final CellComparator cellComparator;
 
+  private final int minBlockSizeBytes;
+
   /**
    * @return The smallest mvcc readPoint across all the scanners in this 
region. Writes older than
    *         this readPoint, are included in every read operation.
@@ -916,6 +918,9 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
           .remove(getRegionInfo().getEncodedName());
       }
     }
+
+    minBlockSizeBytes = 
Arrays.stream(this.htableDescriptor.getColumnFamilies())
+      
.mapToInt(ColumnFamilyDescriptor::getBlocksize).min().orElse(HConstants.DEFAULT_BLOCKSIZE);
   }
 
   private void setHTableSpecificConf() {
@@ -2047,6 +2052,11 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
     return new ReadOnlyConfiguration(this.conf);
   }
 
+  @Override
+  public int getMinBlockSizeBytes() {
+    return minBlockSizeBytes;
+  }
+
   private ThreadPoolExecutor getStoreOpenAndCloseThreadPool(final String 
threadNamePrefix) {
     int numStores = Math.max(1, this.htableDescriptor.getColumnFamilyCount());
     int maxThreads = Math.min(numStores, 
conf.getInt(HConstants.HSTORE_OPEN_AND_CLOSE_THREADS_MAX,
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 6a897a5b9f3..42069e58092 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -571,4 +571,10 @@ public interface Region extends ConfigurationObserver {
    *         if you try to set a configuration.
    */
   Configuration getReadOnlyConfiguration();
+
+  /**
+   * The minimum block size configuration from all relevant column families. 
This is used when
+   * estimating quota consumption.
+   */
+  int getMinBlockSizeBytes();
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java
new file mode 100644
index 00000000000..e27ba123381
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doGets;
+import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doMultiGets;
+import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doPuts;
+import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doScans;
+import static 
org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.triggerUserCacheRefresh;
+import static 
org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.waitMinuteQuota;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestBlockBytesScannedQuota {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestBlockBytesScannedQuota.class);
+
+  private final static Logger LOG = 
LoggerFactory.getLogger(TestBlockBytesScannedQuota.class);
+
+  private static final int REFRESH_TIME = 5000;
+  private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
+  private static final byte[] FAMILY = Bytes.toBytes("cf");
+  private static final byte[] QUALIFIER = Bytes.toBytes("q");
+
+  private static final TableName TABLE_NAME = 
TableName.valueOf("BlockBytesScannedQuotaTest");
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // client should fail fast
+    TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 10);
+    
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+
+    // quotas enabled, using block bytes scanned
+    TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
+    TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 
REFRESH_TIME);
+
+    // don't cache blocks to make IO predictable
+    
TEST_UTIL.getConfiguration().setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 
0.0f);
+
+    TEST_UTIL.startMiniCluster(1);
+    TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+    QuotaCache.TEST_FORCE_REFRESH = true;
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    EnvironmentEdgeManager.reset();
+    TEST_UTIL.deleteTable(TABLE_NAME);
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL);
+  }
+
+  @Test
+  public void testBBSGet() throws Exception {
+    final Admin admin = TEST_UTIL.getAdmin();
+    final String userName = User.getCurrent().getShortName();
+    int blockSize = 
admin.getDescriptor(TABLE_NAME).getColumnFamily(FAMILY).getBlocksize();
+    Table table = admin.getConnection().getTable(TABLE_NAME);
+
+    doPuts(10_000, FAMILY, QUALIFIER, table);
+    TEST_UTIL.flush(TABLE_NAME);
+
+    // Add ~10 block/min limit
+    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, 
ThrottleType.READ_SIZE,
+      Math.round(10.1 * blockSize), TimeUnit.MINUTES));
+    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
+
+    // should execute at max 10 requests
+    testTraffic(() -> doGets(20, FAMILY, QUALIFIER, table), 10, 1);
+
+    // wait a minute and you should get another 10 requests executed
+    waitMinuteQuota();
+    testTraffic(() -> doGets(20, FAMILY, QUALIFIER, table), 10, 1);
+
+    // Remove all the limits
+    admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
+    triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
+    testTraffic(() -> doGets(100, FAMILY, QUALIFIER, table), 100, 0);
+    testTraffic(() -> doGets(100, FAMILY, QUALIFIER, table), 100, 0);
+  }
+
+  @Test
+  public void testBBSScan() throws Exception {
+    final Admin admin = TEST_UTIL.getAdmin();
+    final String userName = User.getCurrent().getShortName();
+    int blockSize = 
admin.getDescriptor(TABLE_NAME).getColumnFamily(FAMILY).getBlocksize();
+    Table table = admin.getConnection().getTable(TABLE_NAME);
+
+    doPuts(10_000, FAMILY, QUALIFIER, table);
+    TEST_UTIL.flush(TABLE_NAME);
+
+    // Add 1 block/min limit.
+    // This should only allow 1 scan per minute, because we estimate 1 block 
per scan
+    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, 
ThrottleType.REQUEST_SIZE, blockSize,
+      TimeUnit.MINUTES));
+    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
+    waitMinuteQuota();
+
+    // should execute 1 request
+    testTraffic(() -> doScans(5, table), 1, 0);
+
+    // Remove all the limits
+    admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
+    triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
+    testTraffic(() -> doScans(100, table), 100, 0);
+    testTraffic(() -> doScans(100, table), 100, 0);
+
+    // Add ~3 block/min limit. This should support >1 scans
+    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, 
ThrottleType.REQUEST_SIZE,
+      Math.round(3.1 * blockSize), TimeUnit.MINUTES));
+    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
+
+    // should execute some requests, but not all
+    testTraffic(() -> doScans(100, table), 100, 90);
+
+    // Remove all the limits
+    admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
+    triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
+    testTraffic(() -> doScans(100, table), 100, 0);
+    testTraffic(() -> doScans(100, table), 100, 0);
+  }
+
+  @Test
+  public void testBBSMultiGet() throws Exception {
+    final Admin admin = TEST_UTIL.getAdmin();
+    final String userName = User.getCurrent().getShortName();
+    int blockSize = 
admin.getDescriptor(TABLE_NAME).getColumnFamily(FAMILY).getBlocksize();
+    Table table = admin.getConnection().getTable(TABLE_NAME);
+    int rowCount = 10_000;
+
+    doPuts(rowCount, FAMILY, QUALIFIER, table);
+    TEST_UTIL.flush(TABLE_NAME);
+
+    // Add 1 block/min limit.
+    // This should only allow 1 multiget per minute, because we estimate 1 
block per multiget
+    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, 
ThrottleType.REQUEST_SIZE, blockSize,
+      TimeUnit.MINUTES));
+    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
+    waitMinuteQuota();
+
+    // should execute 1 request
+    testTraffic(() -> doMultiGets(10, 10, rowCount, FAMILY, QUALIFIER, table), 
1, 1);
+
+    // Remove all the limits
+    admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
+    triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
+    testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, 
table), 100, 0);
+    testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, 
table), 100, 0);
+
+    // Add ~100 block/min limit
+    admin.setQuota(QuotaSettingsFactory.throttleUser(userName, 
ThrottleType.REQUEST_SIZE,
+      Math.round(100.1 * blockSize), TimeUnit.MINUTES));
+    triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
+
+    // should execute approximately 10 batches of 10 requests
+    testTraffic(() -> doMultiGets(20, 10, rowCount, FAMILY, QUALIFIER, table), 
10, 1);
+
+    // wait a minute and you should get another ~10 batches of 10 requests
+    waitMinuteQuota();
+    testTraffic(() -> doMultiGets(20, 10, rowCount, FAMILY, QUALIFIER, table), 
10, 1);
+
+    // Remove all the limits
+    admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
+    triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
+    testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, 
table), 100, 0);
+    testTraffic(() -> doMultiGets(100, 10, rowCount, FAMILY, QUALIFIER, 
table), 100, 0);
+  }
+
+  private void testTraffic(Callable<Long> trafficCallable, long 
expectedSuccess, long marginOfError)
+    throws Exception {
+    TEST_UTIL.waitFor(90_000, () -> {
+      long actualSuccess;
+      try {
+        actualSuccess = trafficCallable.call();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+      LOG.info("Traffic test yielded {} successful requests. Expected {} +/- 
{}", actualSuccess,
+        expectedSuccess, marginOfError);
+      boolean success = (actualSuccess >= expectedSuccess - marginOfError)
+        && (actualSuccess <= expectedSuccess + marginOfError);
+      if (!success) {
+        triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME);
+        waitMinuteQuota();
+        Thread.sleep(15_000L);
+      }
+      return success;
+    });
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java
index de6f5653ad2..bc2d0ae0713 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java
@@ -18,12 +18,17 @@
 package org.apache.hadoop.hbase.quotas;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Objects;
+import java.util.Random;
 import org.apache.hadoop.hbase.HBaseTestingUtil;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -105,6 +110,64 @@ public final class ThrottleQuotaTestUtil {
     return count;
   }
 
+  static long doGets(int maxOps, byte[] family, byte[] qualifier, final 
Table... tables) {
+    int count = 0;
+    try {
+      while (count < maxOps) {
+        Get get = new Get(Bytes.toBytes("row-" + count));
+        get.addColumn(family, qualifier);
+        for (final Table table : tables) {
+          table.get(get);
+        }
+        count += tables.length;
+      }
+    } catch (IOException e) {
+      LOG.error("get failed after nRetries=" + count, e);
+    }
+    return count;
+  }
+
+  static long doMultiGets(int maxOps, int batchSize, int rowCount, byte[] 
family, byte[] qualifier,
+    final Table... tables) {
+    int opCount = 0;
+    Random random = new Random();
+    try {
+      while (opCount < maxOps) {
+        List<Get> gets = new ArrayList<>(batchSize);
+        while (gets.size() < batchSize) {
+          Get get = new Get(Bytes.toBytes("row-" + random.nextInt(rowCount)));
+          get.addColumn(family, qualifier);
+          gets.add(get);
+        }
+        for (final Table table : tables) {
+          table.get(gets);
+        }
+        opCount += tables.length;
+      }
+    } catch (IOException e) {
+      LOG.error("multiget failed after nRetries=" + opCount, e);
+    }
+    return opCount;
+  }
+
+  static long doScans(int maxOps, Table table) {
+    int count = 0;
+    int caching = 100;
+    try {
+      Scan scan = new Scan();
+      scan.setCaching(caching);
+      scan.setCacheBlocks(false);
+      ResultScanner scanner = table.getScanner(scan);
+      while (count < (maxOps * caching)) {
+        scanner.next();
+        count += 1;
+      }
+    } catch (IOException e) {
+      LOG.error("scan failed after nRetries=" + count, e);
+    }
+    return count / caching;
+  }
+
   static void triggerUserCacheRefresh(HBaseTestingUtil testUtil, boolean 
bypass,
     TableName... tables) throws Exception {
     triggerCacheRefresh(testUtil, bypass, true, false, false, false, false, 
tables);

Reply via email to