This is an automated email from the ASF dual-hosted git repository. bbeaudreault pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.6 by this push: new bdb3f216e86 HBASE-28385 Improve scan quota estimates when using block bytes scanned (#5713) bdb3f216e86 is described below commit bdb3f216e864e20eb2b09352707a751a5cf7460f Author: Ray Mattingly <rmdmattin...@gmail.com> AuthorDate: Wed Mar 13 17:58:54 2024 -0400 HBASE-28385 Improve scan quota estimates when using block bytes scanned (#5713) Signed-off-by: Bryan Beaudreault <bbeaudrea...@apache.org> --- .../hadoop/hbase/quotas/DefaultOperationQuota.java | 89 ++++++++++++-- .../hadoop/hbase/quotas/ExceedOperationQuota.java | 33 +++++- .../hadoop/hbase/quotas/NoopOperationQuota.java | 10 +- .../hadoop/hbase/quotas/NoopQuotaLimiter.java | 5 + .../apache/hadoop/hbase/quotas/OperationQuota.java | 20 +++- .../apache/hadoop/hbase/quotas/QuotaLimiter.java | 3 + .../hbase/quotas/RegionServerRpcQuotaManager.java | 80 ++++++++++--- .../hadoop/hbase/quotas/TimeBasedLimiter.java | 5 + .../hadoop/hbase/regionserver/RSRpcServices.java | 31 ++++- .../hbase/quotas/TestBlockBytesScannedQuota.java | 71 ++++++++++-- .../hbase/quotas/TestDefaultOperationQuota.java | 128 +++++++++++++++++++++ .../hadoop/hbase/quotas/ThrottleQuotaTestUtil.java | 7 +- 12 files changed, 427 insertions(+), 55 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index a4ff8b2a859..2e26765a6a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -27,10 +27,17 @@ import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; + @InterfaceAudience.Private @InterfaceStability.Evolving public class DefaultOperationQuota implements OperationQuota { + // a single scan estimate can consume no more than this proportion of the limiter's limit + // this prevents a long-running scan from being estimated at, say, 100MB of IO against + // a <100MB/IO throttle (because this would never succeed) + private static final double MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION = 0.9; + protected final List<QuotaLimiter> limiters; private final long writeCapacityUnit; private final long readCapacityUnit; @@ -53,6 +60,7 @@ public class DefaultOperationQuota implements OperationQuota { protected long readCapacityUnitDiff = 0; private boolean useResultSizeBytes; private long blockSizeBytes; + private long maxScanEstimate; public DefaultOperationQuota(final Configuration conf, final int blockSizeBytes, final QuotaLimiter... limiters) { @@ -60,6 +68,9 @@ public class DefaultOperationQuota implements OperationQuota { this.useResultSizeBytes = conf.getBoolean(OperationQuota.USE_RESULT_SIZE_BYTES, USE_RESULT_SIZE_BYTES_DEFAULT); this.blockSizeBytes = blockSizeBytes; + long readSizeLimit = + Arrays.stream(limiters).mapToLong(QuotaLimiter::getReadLimit).min().orElse(Long.MAX_VALUE); + maxScanEstimate = Math.round(MAX_SCAN_ESTIMATE_PROPORTIONAL_LIMIT_CONSUMPTION * readSizeLimit); } /** @@ -80,21 +91,34 @@ public class DefaultOperationQuota implements OperationQuota { } @Override - public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException { - updateEstimateConsumeQuota(numWrites, numReads, numScans); + public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException { + updateEstimateConsumeBatchQuota(numWrites, numReads); + checkQuota(numWrites, numReads); + } + + @Override + public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize, + long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException { + updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned, + prevBlockBytesScannedDifference); + checkQuota(0, 1); + } + private void checkQuota(long numWrites, long numReads) throws RpcThrottlingException { readAvailable = Long.MAX_VALUE; for (final QuotaLimiter limiter : limiters) { - if (limiter.isBypass()) continue; + if (limiter.isBypass()) { + continue; + } - limiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed, + limiter.checkQuota(numWrites, writeConsumed, numReads, readConsumed, writeCapacityUnitConsumed, readCapacityUnitConsumed); readAvailable = Math.min(readAvailable, limiter.getReadAvailable()); } for (final QuotaLimiter limiter : limiters) { - limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed, - writeCapacityUnitConsumed, readCapacityUnitConsumed); + limiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, writeCapacityUnitConsumed, + readCapacityUnitConsumed); } } @@ -158,24 +182,69 @@ public class DefaultOperationQuota implements OperationQuota { * Update estimate quota(read/write size/capacityUnits) which will be consumed * @param numWrites the number of write requests * @param numReads the number of read requests - * @param numScans the number of scan requests */ - protected void updateEstimateConsumeQuota(int numWrites, int numReads, int numScans) { + protected void updateEstimateConsumeBatchQuota(int numWrites, int numReads) { writeConsumed = estimateConsume(OperationType.MUTATE, numWrites, 100); if (useResultSizeBytes) { readConsumed = estimateConsume(OperationType.GET, numReads, 100); - readConsumed += estimateConsume(OperationType.SCAN, numScans, 1000); } else { // assume 1 block required for reads. this is probably a low estimate, which is okay readConsumed = numReads > 0 ? blockSizeBytes : 0; - readConsumed += numScans > 0 ? blockSizeBytes : 0; } writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed); readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed); } + /** + * Update estimate quota(read/write size/capacityUnits) which will be consumed + * @param scanRequest the scan to be executed + * @param maxScannerResultSize the maximum bytes to be returned by the scanner + * @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the + * scanner + * @param prevBlockBytesScannedDifference the difference between BBS of the previous two next + * calls + */ + protected void updateEstimateConsumeScanQuota(ClientProtos.ScanRequest scanRequest, + long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference) { + if (useResultSizeBytes) { + readConsumed = estimateConsume(OperationType.SCAN, 1, 1000); + } else { + long estimate = getScanReadConsumeEstimate(blockSizeBytes, scanRequest.getNextCallSeq(), + maxScannerResultSize, maxBlockBytesScanned, prevBlockBytesScannedDifference); + readConsumed = Math.min(maxScanEstimate, estimate); + } + + readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed); + } + + protected static long getScanReadConsumeEstimate(long blockSizeBytes, long nextCallSeq, + long maxScannerResultSize, long maxBlockBytesScanned, long prevBlockBytesScannedDifference) { + /* + * Estimating scan workload is more complicated, and if we severely underestimate workloads then + * throttled clients will exhaust retries too quickly, and could saturate the RPC layer + */ + if (nextCallSeq == 0) { + // start scanners with an optimistic 1 block IO estimate + // it is better to underestimate a large scan in the beginning + // than to overestimate, and block, a small scan + return blockSizeBytes; + } + + boolean isWorkloadGrowing = prevBlockBytesScannedDifference > blockSizeBytes; + if (isWorkloadGrowing) { + // if nextCallSeq > 0 and the workload is growing then our estimate + // should consider that the workload may continue to increase + return Math.min(maxScannerResultSize, nextCallSeq * maxBlockBytesScanned); + } else { + // if nextCallSeq > 0 and the workload is shrinking or flat + // then our workload has likely plateaued. We can just rely on the existing + // maxBlockBytesScanned as our estimate in this case. + return maxBlockBytesScanned; + } + } + private long estimateConsume(final OperationType type, int numReqs, long avgSize) { if (numReqs > 0) { return avgSize * numReqs; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java index 1788e550f22..3077d6dac53 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java @@ -23,6 +23,8 @@ import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; + /* * Internal class used to check and consume quota if exceed throttle quota is enabled. Exceed * throttle quota means, user can over consume user/namespace/table quota if region server has @@ -47,15 +49,32 @@ public class ExceedOperationQuota extends DefaultOperationQuota { } @Override - public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException { + public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException { + Runnable estimateQuota = () -> updateEstimateConsumeBatchQuota(numWrites, numReads); + CheckQuotaRunnable checkQuota = () -> super.checkBatchQuota(numWrites, numReads); + checkQuota(estimateQuota, checkQuota, numWrites, numReads, 0); + } + + @Override + public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize, + long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException { + Runnable estimateQuota = () -> updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize, + maxBlockBytesScanned, prevBlockBytesScannedDifference); + CheckQuotaRunnable checkQuota = () -> super.checkScanQuota(scanRequest, maxScannerResultSize, + maxBlockBytesScanned, prevBlockBytesScannedDifference); + checkQuota(estimateQuota, checkQuota, 0, 0, 1); + } + + private void checkQuota(Runnable estimateQuota, CheckQuotaRunnable checkQuota, int numWrites, + int numReads, int numScans) throws RpcThrottlingException { if (regionServerLimiter.isBypass()) { // If region server limiter is bypass, which means no region server quota is set, check and // throttle by all other quotas. In this condition, exceed throttle quota will not work. LOG.warn("Exceed throttle quota is enabled but no region server quotas found"); - super.checkQuota(numWrites, numReads, numScans); + checkQuota.run(); } else { // 1. Update estimate quota which will be consumed - updateEstimateConsumeQuota(numWrites, numReads, numScans); + estimateQuota.run(); // 2. Check if region server limiter is enough. If not, throw RpcThrottlingException. regionServerLimiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed, writeCapacityUnitConsumed, readCapacityUnitConsumed); @@ -63,11 +82,11 @@ public class ExceedOperationQuota extends DefaultOperationQuota { // limiter is enough. boolean exceed = false; try { - super.checkQuota(numWrites, numReads, numScans); + checkQuota.run(); } catch (RpcThrottlingException e) { exceed = true; if (LOG.isDebugEnabled()) { - LOG.debug("Read/Write requests num exceeds quota: writes:{} reads:{} scan:{}, " + LOG.debug("Read/Write requests num exceeds quota: writes:{} reads:{}, scans:{}, " + "try use region server quota", numWrites, numReads, numScans); } } @@ -96,4 +115,8 @@ public class ExceedOperationQuota extends DefaultOperationQuota { regionServerLimiter.consumeRead(readDiff, readCapacityUnitDiff); } } + + private interface CheckQuotaRunnable { + void run() throws RpcThrottlingException; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java index b64429d9adc..736560e6fd1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java @@ -23,6 +23,8 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; + /** * Noop operation quota returned when no quota is associated to the user/table */ @@ -40,7 +42,13 @@ class NoopOperationQuota implements OperationQuota { } @Override - public void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException { + public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException { + // no-op + } + + @Override + public void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize, + long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException { // no-op } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java index 63d7610115a..cf1e49c12e5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java @@ -70,6 +70,11 @@ class NoopQuotaLimiter implements QuotaLimiter { throw new UnsupportedOperationException(); } + @Override + public long getReadLimit() { + return Long.MAX_VALUE; + } + @Override public String toString() { return "NoopQuotaLimiter"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java index bedad5e9867..ef0a35fa589 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java @@ -23,6 +23,8 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; + /** * Interface that allows to check the quota available for an operation. */ @@ -51,11 +53,25 @@ public interface OperationQuota { * on the number of operations to perform and the average size accumulated during time. * @param numWrites number of write operation that will be performed * @param numReads number of small-read operation that will be performed - * @param numScans number of long-read operation that will be performed * @throws RpcThrottlingException if the operation cannot be performed because RPC quota is * exceeded. */ - void checkQuota(int numWrites, int numReads, int numScans) throws RpcThrottlingException; + void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException; + + /** + * Checks if it is possible to execute the scan. The quota will be estimated based on the + * composition of the scan. + * @param scanRequest the given scan operation + * @param maxScannerResultSize the maximum bytes to be returned by the scanner + * @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the + * scanner + * @param prevBlockBytesScannedDifference the difference between BBS of the previous two next + * calls + * @throws RpcThrottlingException if the operation cannot be performed because RPC quota is + * exceeded. + */ + void checkScanQuota(ClientProtos.ScanRequest scanRequest, long maxScannerResultSize, + long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException; /** Cleanup method on operation completion */ void close(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java index 14326e4e0d2..8d00a702e25 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java @@ -76,6 +76,9 @@ public interface QuotaLimiter { /** Returns the number of bytes available to read to avoid exceeding the quota */ long getReadAvailable(); + /** Returns the maximum number of bytes ever available to read */ + long getReadLimit(); + /** Returns the number of bytes available to write to avoid exceeding the quota */ long getWriteAvailable(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java index 3c72c662887..92a0cfd5c13 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java @@ -156,38 +156,82 @@ public class RegionServerRpcQuotaManager { /** * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the - * available quota and to report the data/usage of the operation. + * available quota and to report the data/usage of the operation. This method is specific to scans + * because estimating a scan's workload is more complicated than estimating the workload of a + * get/put. + * @param region the region where the operation will be performed + * @param scanRequest the scan to be estimated against the quota + * @param maxScannerResultSize the maximum bytes to be returned by the scanner + * @param maxBlockBytesScanned the maximum bytes scanned in a single RPC call by the + * scanner + * @param prevBlockBytesScannedDifference the difference between BBS of the previous two next + * calls + * @return the OperationQuota + * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. + */ + public OperationQuota checkScanQuota(final Region region, + final ClientProtos.ScanRequest scanRequest, long maxScannerResultSize, + long maxBlockBytesScanned, long prevBlockBytesScannedDifference) + throws IOException, RpcThrottlingException { + Optional<User> user = RpcServer.getRequestUser(); + UserGroupInformation ugi; + if (user.isPresent()) { + ugi = user.get().getUGI(); + } else { + ugi = User.getCurrent().getUGI(); + } + TableDescriptor tableDescriptor = region.getTableDescriptor(); + TableName table = tableDescriptor.getTableName(); + + OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes()); + try { + quota.checkScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned, + prevBlockBytesScannedDifference); + } catch (RpcThrottlingException e) { + LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table + " scan=" + + scanRequest.getScannerId() + ": " + e.getMessage()); + throw e; + } + return quota; + } + + /** + * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the + * available quota and to report the data/usage of the operation. This method does not support + * scans because estimating a scan's workload is more complicated than estimating the workload of + * a get/put. * @param region the region where the operation will be performed * @param type the operation type * @return the OperationQuota * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. */ - public OperationQuota checkQuota(final Region region, final OperationQuota.OperationType type) - throws IOException, RpcThrottlingException { + public OperationQuota checkBatchQuota(final Region region, + final OperationQuota.OperationType type) throws IOException, RpcThrottlingException { switch (type) { - case SCAN: - return checkQuota(region, 0, 0, 1); case GET: - return checkQuota(region, 0, 1, 0); + return this.checkBatchQuota(region, 0, 1); case MUTATE: - return checkQuota(region, 1, 0, 0); + return this.checkBatchQuota(region, 1, 0); case CHECK_AND_MUTATE: - return checkQuota(region, 1, 1, 0); + return this.checkBatchQuota(region, 1, 1); } throw new RuntimeException("Invalid operation type: " + type); } /** * Check the quota for the current (rpc-context) user. Returns the OperationQuota used to get the - * available quota and to report the data/usage of the operation. + * available quota and to report the data/usage of the operation. This method does not support + * scans because estimating a scan's workload is more complicated than estimating the workload of + * a get/put. * @param region the region where the operation will be performed * @param actions the "multi" actions to perform * @param hasCondition whether the RegionAction has a condition * @return the OperationQuota * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. */ - public OperationQuota checkQuota(final Region region, final List<ClientProtos.Action> actions, - boolean hasCondition) throws IOException, RpcThrottlingException { + public OperationQuota checkBatchQuota(final Region region, + final List<ClientProtos.Action> actions, boolean hasCondition) + throws IOException, RpcThrottlingException { int numWrites = 0; int numReads = 0; for (final ClientProtos.Action action : actions) { @@ -202,7 +246,7 @@ public class RegionServerRpcQuotaManager { numReads++; } } - return checkQuota(region, numWrites, numReads, 0); + return checkBatchQuota(region, numWrites, numReads); } /** @@ -211,12 +255,11 @@ public class RegionServerRpcQuotaManager { * @param region the region where the operation will be performed * @param numWrites number of writes to perform * @param numReads number of short-reads to perform - * @param numScans number of scan to perform * @return the OperationQuota * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. */ - private OperationQuota checkQuota(final Region region, final int numWrites, final int numReads, - final int numScans) throws IOException, RpcThrottlingException { + private OperationQuota checkBatchQuota(final Region region, final int numWrites, + final int numReads) throws IOException, RpcThrottlingException { Optional<User> user = RpcServer.getRequestUser(); UserGroupInformation ugi; if (user.isPresent()) { @@ -229,11 +272,10 @@ public class RegionServerRpcQuotaManager { OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes()); try { - quota.checkQuota(numWrites, numReads, numScans); + quota.checkBatchQuota(numWrites, numReads); } catch (RpcThrottlingException e) { - LOG.debug( - "Throttling exception for user=" + ugi.getUserName() + " table=" + table + " numWrites=" - + numWrites + " numReads=" + numReads + " numScans=" + numScans + ": " + e.getMessage()); + LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table + + " numWrites=" + numWrites + " numReads=" + numReads + ": " + e.getMessage()); throw e; } return quota; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java index 8ae2cae0188..483edbcd3a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java @@ -243,6 +243,11 @@ public class TimeBasedLimiter implements QuotaLimiter { return readSizeLimiter.getAvailable(); } + @Override + public long getReadLimit() { + return Math.min(readSizeLimiter.getLimit(), reqSizeLimiter.getLimit()); + } + @Override public String toString() { StringBuilder builder = new StringBuilder(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index bb59dd7ec90..53a11fa3177 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -486,6 +486,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin private boolean fullRegionScan; private final String clientIPAndPort; private final String userName; + private volatile long maxBlockBytesScanned = 0; + private volatile long prevBlockBytesScanned = 0; + private volatile long prevBlockBytesScannedDifference = 0; RegionScannerHolder(RegionScanner s, HRegion r, RpcCallback closeCallBack, RpcCallback shippedCallback, boolean needCursor, boolean fullRegionScan, @@ -509,6 +512,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin return nextCallSeq.compareAndSet(currentSeq, currentSeq + 1); } + long getMaxBlockBytesScanned() { + return maxBlockBytesScanned; + } + + long getPrevBlockBytesScannedDifference() { + return prevBlockBytesScannedDifference; + } + + void updateBlockBytesScanned(long blockBytesScanned) { + prevBlockBytesScannedDifference = blockBytesScanned - prevBlockBytesScanned; + prevBlockBytesScanned = blockBytesScanned; + if (blockBytesScanned > maxBlockBytesScanned) { + maxBlockBytesScanned = blockBytesScanned; + } + } + // Should be called only when we need to print lease expired messages otherwise // cache the String once made. @Override @@ -2551,7 +2570,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin } Boolean existence = null; Result r = null; - quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.GET); + quota = getRpcQuotaManager().checkBatchQuota(region, OperationQuota.OperationType.GET); Get clientGet = ProtobufUtil.toGet(get); if (get.getExistenceOnly() && region.getCoprocessorHost() != null) { @@ -2753,7 +2772,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin try { region = getRegion(regionSpecifier); - quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList(), + quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(), regionAction.hasCondition()); } catch (IOException e) { failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); @@ -2805,7 +2824,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin try { region = getRegion(regionSpecifier); - quota = getRpcQuotaManager().checkQuota(region, regionAction.getActionList(), + quota = getRpcQuotaManager().checkBatchQuota(region, regionAction.getActionList(), regionAction.hasCondition()); } catch (IOException e) { failRegionAction(responseBuilder, regionActionResultBuilder, regionAction, cellScanner, e); @@ -2962,7 +2981,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin } long nonceGroup = request.hasNonceGroup() ? request.getNonceGroup() : HConstants.NO_NONCE; OperationQuota.OperationType operationType = QuotaUtil.getQuotaOperationType(request); - quota = getRpcQuotaManager().checkQuota(region, operationType); + quota = getRpcQuotaManager().checkBatchQuota(region, operationType); ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements(); @@ -3515,6 +3534,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin if (rpcCall != null) { responseCellSize = rpcCall.getResponseCellSize(); blockBytesScanned = rpcCall.getBlockBytesScanned(); + rsh.updateBlockBytesScanned(blockBytesScanned); } region.getMetrics().updateScanTime(end - before); final MetricsRegionServer metricsRegionServer = regionServer.getMetrics(); @@ -3618,7 +3638,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin } OperationQuota quota; try { - quota = getRpcQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN); + quota = getRpcQuotaManager().checkScanQuota(region, request, maxScannerResultSize, + rsh.getMaxBlockBytesScanned(), rsh.getPrevBlockBytesScannedDifference()); } catch (IOException e) { addScannerLeaseBack(lease); throw new ServiceException(e); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java index 7eb0b09336b..de3600b9ee9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestBlockBytesScannedQuota.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doPuts; import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.doScans; import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.triggerUserCacheRefresh; import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.waitMinuteQuota; +import static org.junit.Assert.assertTrue; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; @@ -60,12 +61,17 @@ public class TestBlockBytesScannedQuota { private static final byte[] QUALIFIER = Bytes.toBytes("q"); private static final TableName TABLE_NAME = TableName.valueOf("BlockBytesScannedQuotaTest"); + private static final long MAX_SCANNER_RESULT_SIZE = 100 * 1024 * 1024; @BeforeClass public static void setUpBeforeClass() throws Exception { // client should fail fast TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 10); TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, + MAX_SCANNER_RESULT_SIZE); + TEST_UTIL.getConfiguration().setClass(RateLimiter.QUOTA_RATE_LIMITER_CONF_KEY, + AverageIntervalRateLimiter.class, RateLimiter.class); // quotas enabled, using block bytes scanned TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); @@ -140,27 +146,75 @@ public class TestBlockBytesScannedQuota { waitMinuteQuota(); // should execute 1 request - testTraffic(() -> doScans(5, table), 1, 0); + testTraffic(() -> doScans(5, table, 1), 1, 0); // Remove all the limits admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); - testTraffic(() -> doScans(100, table), 100, 0); - testTraffic(() -> doScans(100, table), 100, 0); + testTraffic(() -> doScans(100, table, 1), 100, 0); + testTraffic(() -> doScans(100, table, 1), 100, 0); // Add ~3 block/sec limit. This should support >1 scans admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, Math.round(3.1 * blockSize), TimeUnit.SECONDS)); triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + waitMinuteQuota(); + + // Add 50 block/sec limit. This should support >1 scans + admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, + Math.round(50.1 * blockSize), TimeUnit.SECONDS)); + triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + waitMinuteQuota(); + + // This will produce some throttling exceptions, but all/most should succeed within the timeout + testTraffic(() -> doScans(100, table, 1), 75, 25); + triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + waitMinuteQuota(); + + // With large caching, a big scan should succeed + testTraffic(() -> doScans(10_000, table, 10_000), 10_000, 0); + triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + waitMinuteQuota(); + + // Remove all the limits + admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); + triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); + testTraffic(() -> doScans(100, table, 1), 100, 0); + testTraffic(() -> doScans(100, table, 1), 100, 0); + } + + @Test + public void testSmallScanNeverBlockedByLargeEstimate() throws Exception { + final Admin admin = TEST_UTIL.getAdmin(); + final String userName = User.getCurrent().getShortName(); + Table table = admin.getConnection().getTable(TABLE_NAME); - // should execute some requests, but not all - testTraffic(() -> doScans(100, table), 100, 90); + doPuts(10_000, FAMILY, QUALIFIER, table); + TEST_UTIL.flush(TABLE_NAME); + + // Add 99MB/sec limit. + // This should never be blocked, but with a sequence number approaching 10k, without + // other intervention, we would estimate a scan workload approaching 625MB or the + // maxScannerResultSize (both larger than the 90MB limit). This test ensures that all + // requests succeed, so the estimate never becomes large enough to cause read downtime + long limit = 99 * 1024 * 1024; + assertTrue(limit <= MAX_SCANNER_RESULT_SIZE); // always true, but protecting against code + // changes + admin.setQuota(QuotaSettingsFactory.throttleUser(userName, ThrottleType.REQUEST_SIZE, limit, + TimeUnit.SECONDS)); + triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + waitMinuteQuota(); + + // should execute all requests + testTraffic(() -> doScans(10_000, table, 1), 10_000, 0); + triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + waitMinuteQuota(); // Remove all the limits admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName)); triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); - testTraffic(() -> doScans(100, table), 100, 0); - testTraffic(() -> doScans(100, table), 100, 0); + testTraffic(() -> doScans(100, table, 1), 100, 0); + testTraffic(() -> doScans(100, table, 1), 100, 0); } @Test @@ -223,9 +277,8 @@ public class TestBlockBytesScannedQuota { boolean success = (actualSuccess >= expectedSuccess - marginOfError) && (actualSuccess <= expectedSuccess + marginOfError); if (!success) { - triggerUserCacheRefresh(TEST_UTIL, true, TABLE_NAME); + triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); waitMinuteQuota(); - Thread.sleep(15_000L); } return success; }); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java new file mode 100644 index 00000000000..4684be02d69 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, SmallTests.class }) +public class TestDefaultOperationQuota { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestDefaultOperationQuota.class); + + @Test + public void testScanEstimateNewScanner() { + long blockSize = 64 * 1024; + long nextCallSeq = 0; + long maxScannerResultSize = 100 * 1024 * 1024; + long maxBlockBytesScanned = 0; + long prevBBSDifference = 0; + long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, + maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); + + // new scanner should estimate scan read as 1 block + assertEquals(blockSize, estimate); + } + + @Test + public void testScanEstimateSecondNextCall() { + long blockSize = 64 * 1024; + long nextCallSeq = 1; + long maxScannerResultSize = 100 * 1024 * 1024; + long maxBlockBytesScanned = 10 * blockSize; + long prevBBSDifference = 10 * blockSize; + long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, + maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); + + // 2nd next call should be estimated at maxBBS + assertEquals(maxBlockBytesScanned, estimate); + } + + @Test + public void testScanEstimateFlatWorkload() { + long blockSize = 64 * 1024; + long nextCallSeq = 100; + long maxScannerResultSize = 100 * 1024 * 1024; + long maxBlockBytesScanned = 10 * blockSize; + long prevBBSDifference = 0; + long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, + maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); + + // flat workload should not overestimate + assertEquals(maxBlockBytesScanned, estimate); + } + + @Test + public void testScanEstimateVariableFlatWorkload() { + long blockSize = 64 * 1024; + long nextCallSeq = 1; + long maxScannerResultSize = 100 * 1024 * 1024; + long maxBlockBytesScanned = 10 * blockSize; + long prevBBSDifference = 0; + for (int i = 0; i < 100; i++) { + long variation = Math.round(Math.random() * blockSize); + if (variation % 2 == 0) { + variation *= -1; + } + // despite +/- <1 block variation, we consider this workload flat + prevBBSDifference = variation; + + long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq + i, + maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); + + // flat workload should not overestimate + assertEquals(maxBlockBytesScanned, estimate); + } + } + + @Test + public void testScanEstimateGrowingWorkload() { + long blockSize = 64 * 1024; + long nextCallSeq = 100; + long maxScannerResultSize = 100 * 1024 * 1024; + long maxBlockBytesScanned = 20 * blockSize; + long prevBBSDifference = 10 * blockSize; + long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, + maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); + + // growing workload should overestimate + assertTrue(nextCallSeq * maxBlockBytesScanned == estimate || maxScannerResultSize == estimate); + } + + @Test + public void testScanEstimateShrinkingWorkload() { + long blockSize = 64 * 1024; + long nextCallSeq = 100; + long maxScannerResultSize = 100 * 1024 * 1024; + long maxBlockBytesScanned = 20 * blockSize; + long prevBBSDifference = -10 * blockSize; + long estimate = DefaultOperationQuota.getScanReadConsumeEstimate(blockSize, nextCallSeq, + maxScannerResultSize, maxBlockBytesScanned, prevBBSDifference); + + // shrinking workload should only shrink estimate to maxBBS + assertEquals(maxBlockBytesScanned, estimate); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java index ec1b1d4bdcc..adfc46bb4a5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java @@ -152,22 +152,21 @@ public final class ThrottleQuotaTestUtil { return opCount; } - static long doScans(int maxOps, Table table) { + static long doScans(int desiredRows, Table table, int caching) { int count = 0; - int caching = 100; try { Scan scan = new Scan(); scan.setCaching(caching); scan.setCacheBlocks(false); ResultScanner scanner = table.getScanner(scan); - while (count < (maxOps * caching)) { + while (count < desiredRows) { scanner.next(); count += 1; } } catch (IOException e) { LOG.error("scan failed after nRetries=" + count, e); } - return count / caching; + return count; } static void triggerUserCacheRefresh(HBaseTestingUtility testUtil, boolean bypass,