This is an automated email from the ASF dual-hosted git repository. rmattingly pushed a commit to branch HBASE-29231-branch-3 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit c6238408c1a436764e72fe345164371aae00cc0b Author: Alex Hughes <[email protected]> AuthorDate: Tue Jun 3 22:04:42 2025 +0200 HBASE-29231 Throttles should support limits based on handler thread usage time (#7000) Co-authored-by: Alex Hughes <[email protected]> Signed-off-by: Ray Mattingly <[email protected]> --- .../hadoop/hbase/quotas/QuotaSettingsFactory.java | 4 + .../hbase/quotas/RpcThrottlingException.java | 9 +- .../hadoop/hbase/quotas/ThrottleSettings.java | 3 + .../apache/hadoop/hbase/quotas/ThrottleType.java | 3 + .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 4 + .../src/main/protobuf/server/Quota.proto | 3 + .../hadoop/hbase/quotas/DefaultOperationQuota.java | 56 +++++++-- .../hadoop/hbase/quotas/ExceedOperationQuota.java | 14 ++- .../hbase/quotas/GlobalQuotaSettingsImpl.java | 15 +++ .../hadoop/hbase/quotas/NoopQuotaLimiter.java | 10 +- .../apache/hadoop/hbase/quotas/QuotaLimiter.java | 59 ++++++---- .../org/apache/hadoop/hbase/quotas/QuotaUtil.java | 4 + .../hbase/quotas/RegionServerRpcQuotaManager.java | 15 ++- .../hadoop/hbase/quotas/TimeBasedLimiter.java | 27 ++++- .../hbase/quotas/TestDefaultHandlerUsageQuota.java | 123 +++++++++++++++++++ .../hbase/quotas/TestDefaultOperationQuota.java | 24 ++-- .../apache/hadoop/hbase/quotas/TestQuotaState.java | 8 +- .../hbase/quotas/TestThreadHandlerUsageQuota.java | 131 +++++++++++++++++++++ 18 files changed, 459 insertions(+), 53 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java index 67f85da1d51..cea5fc7cb86 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/QuotaSettingsFactory.java @@ -180,6 +180,10 @@ public class QuotaSettingsFactory { settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace, regionServer, ThrottleType.ATOMIC_REQUEST_NUMBER, throttle.getAtomicReqNum())); } + if (throttle.hasReqHandlerUsageMs()) { + settings.add(ThrottleSettings.fromTimedQuota(userName, tableName, namespace, regionServer, + ThrottleType.REQUEST_HANDLER_USAGE_MS, throttle.getReqHandlerUsageMs())); + } return settings; } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottlingException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottlingException.java index dfa8eacb13b..d4ab38f5bf7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottlingException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottlingException.java @@ -44,13 +44,15 @@ public class RpcThrottlingException extends HBaseIOException { AtomicRequestNumberExceeded, AtomicReadSizeExceeded, AtomicWriteSizeExceeded, + RequestHandlerUsageTimeExceeded, } private static final String[] MSG_TYPE = new String[] { "number of requests exceeded", "request size limit exceeded", "number of read requests exceeded", "number of write requests exceeded", "write size limit exceeded", "read size limit exceeded", "request capacity unit exceeded", "read capacity unit exceeded", "write capacity unit exceeded", - "atomic request number exceeded", "atomic read size exceeded", "atomic write size exceeded" }; + "atomic request number exceeded", "atomic read size exceeded", "atomic write size exceeded", + "request handler usage time exceeded" }; private static final String MSG_WAIT = " - wait "; @@ -145,6 +147,11 @@ public class RpcThrottlingException extends HBaseIOException { throwThrottlingException(Type.AtomicWriteSizeExceeded, waitInterval); } + public static void throwRequestHandlerUsageTimeExceeded(final long waitInterval) + throws RpcThrottlingException { + throwThrottlingException(Type.RequestHandlerUsageTimeExceeded, waitInterval); + } + private static void throwThrottlingException(final Type type, final long waitInterval) throws RpcThrottlingException { String msg = MSG_TYPE[type.ordinal()] + MSG_WAIT + stringFromMillis(waitInterval); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java index efde451c122..336848690d6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java @@ -108,6 +108,9 @@ public class ThrottleSettings extends QuotaSettings { case WRITE_CAPACITY_UNIT: builder.append(String.format("%dCU", timedQuota.getSoftLimit())); break; + case REQUEST_HANDLER_USAGE_MS: + builder.append(String.format("%dms", timedQuota.getSoftLimit())); + break; default: } } else if (timedQuota.hasShare()) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java index 2c5a25acc2c..277451a3e2e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java @@ -59,4 +59,7 @@ public enum ThrottleType { /** Throttling based on the size of atomic write requests */ ATOMIC_WRITE_SIZE, + + /** Throttling based on the handler thread time in milliseconds used */ + REQUEST_HANDLER_USAGE_MS, } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index 616e0e37457..60175137ad2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -2496,6 +2496,8 @@ public final class ProtobufUtil { return ThrottleType.ATOMIC_REQUEST_NUMBER; case ATOMIC_WRITE_SIZE: return ThrottleType.ATOMIC_WRITE_SIZE; + case REQUEST_HANDLER_USAGE_MS: + return ThrottleType.REQUEST_HANDLER_USAGE_MS; default: throw new RuntimeException("Invalid ThrottleType " + proto); } @@ -2531,6 +2533,8 @@ public final class ProtobufUtil { return QuotaProtos.ThrottleType.ATOMIC_REQUEST_NUMBER; case ATOMIC_WRITE_SIZE: return QuotaProtos.ThrottleType.ATOMIC_WRITE_SIZE; + case REQUEST_HANDLER_USAGE_MS: + return QuotaProtos.ThrottleType.REQUEST_HANDLER_USAGE_MS; default: throw new RuntimeException("Invalid ThrottleType " + type); } diff --git a/hbase-protocol-shaded/src/main/protobuf/server/Quota.proto b/hbase-protocol-shaded/src/main/protobuf/server/Quota.proto index e524e015b62..f28cb701646 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/Quota.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/Quota.proto @@ -52,6 +52,7 @@ enum ThrottleType { ATOMIC_READ_SIZE = 10; ATOMIC_REQUEST_NUMBER = 11; ATOMIC_WRITE_SIZE = 12; + REQUEST_HANDLER_USAGE_MS = 13; } message Throttle { @@ -71,6 +72,8 @@ message Throttle { optional TimedQuota atomic_read_size = 10; optional TimedQuota atomic_req_num = 11; optional TimedQuota atomic_write_size = 12; + + optional TimedQuota req_handler_usage_ms = 13; } message ThrottleRequest { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index f153eca2e5a..16082bff98f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -21,10 +21,12 @@ import java.util.Arrays; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.ipc.RpcCall; import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceStability; @@ -45,11 +47,18 @@ public class DefaultOperationQuota implements OperationQuota { // the available read/write quota size in bytes protected long readAvailable = 0; + + // The estimated handler usage time in ms for a request based on + // the number of requests per second and the number of handler threads + private final long estimatedHandlerUsagePerReq; + // estimated quota protected long writeConsumed = 0; protected long readConsumed = 0; protected long writeCapacityUnitConsumed = 0; protected long readCapacityUnitConsumed = 0; + protected long handlerUsageTimeConsumed = 0; + // real consumed quota private final long[] operationSize; // difference between estimated quota and real consumed quota used in close method @@ -59,14 +68,15 @@ public class DefaultOperationQuota implements OperationQuota { protected long readDiff = 0; protected long writeCapacityUnitDiff = 0; protected long readCapacityUnitDiff = 0; + protected long handlerUsageTimeDiff = 0; private boolean useResultSizeBytes; private long blockSizeBytes; private long maxScanEstimate; private boolean isAtomic = false; public DefaultOperationQuota(final Configuration conf, final int blockSizeBytes, - final QuotaLimiter... limiters) { - this(conf, Arrays.asList(limiters)); + final double requestsPerSecond, final QuotaLimiter... limiters) { + this(conf, requestsPerSecond, Arrays.asList(limiters)); this.useResultSizeBytes = conf.getBoolean(OperationQuota.USE_RESULT_SIZE_BYTES, USE_RESULT_SIZE_BYTES_DEFAULT); this.blockSizeBytes = blockSizeBytes; @@ -78,15 +88,20 @@ public class DefaultOperationQuota implements OperationQuota { /** * NOTE: The order matters. It should be something like [user, table, namespace, global] */ - public DefaultOperationQuota(final Configuration conf, final List<QuotaLimiter> limiters) { + public DefaultOperationQuota(final Configuration conf, final double requestsPerSecond, + final List<QuotaLimiter> limiters) { this.writeCapacityUnit = conf.getLong(QuotaUtil.WRITE_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_WRITE_CAPACITY_UNIT); this.readCapacityUnit = conf.getLong(QuotaUtil.READ_CAPACITY_UNIT_CONF_KEY, QuotaUtil.DEFAULT_READ_CAPACITY_UNIT); this.limiters = limiters; + int numHandlerThreads = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT); + this.estimatedHandlerUsagePerReq = + calculateHandlerUsageTimeEstimate(requestsPerSecond, numHandlerThreads); + int size = OperationType.values().length; operationSize = new long[size]; - for (int i = 0; i < size; ++i) { operationSize[i] = 0; } @@ -128,13 +143,13 @@ public class DefaultOperationQuota implements OperationQuota { limiter.checkQuota(Math.min(maxWritesToEstimate, numWrites), Math.min(maxWriteSizeToEstimate, writeConsumed), Math.min(maxReadsToEstimate, numReads), Math.min(maxReadSizeToEstimate, readConsumed), writeCapacityUnitConsumed, - readCapacityUnitConsumed, isAtomic); + readCapacityUnitConsumed, isAtomic, handlerUsageTimeConsumed); readAvailable = Math.min(readAvailable, limiter.getReadAvailable()); } for (final QuotaLimiter limiter : limiters) { limiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, writeCapacityUnitConsumed, - readCapacityUnitConsumed, isAtomic); + readCapacityUnitConsumed, isAtomic, handlerUsageTimeConsumed); } } @@ -152,12 +167,12 @@ public class DefaultOperationQuota implements OperationQuota { RpcServer.getCurrentCall().map(RpcCall::getBlockBytesScanned).orElse(0L); readDiff = Math.max(blockBytesScanned, resultSize) - readConsumed; } - writeCapacityUnitDiff = calculateWriteCapacityUnitDiff(operationSize[OperationType.MUTATE.ordinal()], writeConsumed); readCapacityUnitDiff = calculateReadCapacityUnitDiff( operationSize[OperationType.GET.ordinal()] + operationSize[OperationType.SCAN.ordinal()], readConsumed); + handlerUsageTimeDiff = calculateHandlerUsageMsDiff(); for (final QuotaLimiter limiter : limiters) { if (writeDiff != 0) { @@ -166,6 +181,9 @@ public class DefaultOperationQuota implements OperationQuota { if (readDiff != 0) { limiter.consumeRead(readDiff, readCapacityUnitDiff, isAtomic); } + if (handlerUsageTimeDiff != 0) { + limiter.consumeTime(handlerUsageTimeDiff); + } } } @@ -216,6 +234,8 @@ public class DefaultOperationQuota implements OperationQuota { writeCapacityUnitConsumed = calculateWriteCapacityUnit(writeConsumed); readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed); + + handlerUsageTimeConsumed = (numReads + numWrites) * estimatedHandlerUsagePerReq; } /** @@ -238,6 +258,7 @@ public class DefaultOperationQuota implements OperationQuota { } readCapacityUnitConsumed = calculateReadCapacityUnit(readConsumed); + handlerUsageTimeConsumed = estimatedHandlerUsagePerReq; } protected static long getScanReadConsumeEstimate(long blockSizeBytes, long nextCallSeq, @@ -288,4 +309,25 @@ public class DefaultOperationQuota implements OperationQuota { private long calculateReadCapacityUnitDiff(final long actualSize, final long estimateSize) { return calculateReadCapacityUnit(actualSize) - calculateReadCapacityUnit(estimateSize); } + + private long calculateHandlerUsageTimeEstimate(final double requestsPerSecond, + final int numHandlerThreads) { + if (requestsPerSecond <= numHandlerThreads) { + // If less than 1 request per second per handler thread, then we use the number of handler + // threads as a baseline to avoid incorrect estimations when the number of requests is very + // low. + return numHandlerThreads; + } else { + double requestsPerMillisecond = Math.ceil(requestsPerSecond / 1000); + // We don't ever want zero here + return Math.max((long) requestsPerMillisecond, 1L); + } + } + + private long calculateHandlerUsageMsDiff() { + long currentTime = EnvironmentEdgeManager.currentTime(); + long startTime = RpcServer.getCurrentCall().map(RpcCall::getStartTime).orElse(currentTime); + long timeElapsed = currentTime - startTime; + return handlerUsageTimeConsumed - timeElapsed; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java index 7dcfec6b062..eb8824685c6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java @@ -43,8 +43,9 @@ public class ExceedOperationQuota extends DefaultOperationQuota { private QuotaLimiter regionServerLimiter; public ExceedOperationQuota(final Configuration conf, int blockSizeBytes, - QuotaLimiter regionServerLimiter, final QuotaLimiter... limiters) { - super(conf, blockSizeBytes, limiters); + final double requestsPerSecond, QuotaLimiter regionServerLimiter, + final QuotaLimiter... limiters) { + super(conf, blockSizeBytes, requestsPerSecond, limiters); this.regionServerLimiter = regionServerLimiter; } @@ -78,7 +79,7 @@ public class ExceedOperationQuota extends DefaultOperationQuota { estimateQuota.run(); // 2. Check if region server limiter is enough. If not, throw RpcThrottlingException. regionServerLimiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed, - writeCapacityUnitConsumed, readCapacityUnitConsumed, isAtomic); + writeCapacityUnitConsumed, readCapacityUnitConsumed, isAtomic, handlerUsageTimeConsumed); // 3. Check if other limiters are enough. If not, exceed other limiters because region server // limiter is enough. boolean exceed = false; @@ -94,13 +95,13 @@ public class ExceedOperationQuota extends DefaultOperationQuota { // 4. Region server limiter is enough and grab estimated consume quota. readAvailable = Math.max(readAvailable, regionServerLimiter.getReadAvailable()); regionServerLimiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed, - writeCapacityUnitConsumed, writeCapacityUnitConsumed, isAtomic); + writeCapacityUnitConsumed, writeCapacityUnitConsumed, isAtomic, handlerUsageTimeConsumed); if (exceed) { // 5. Other quota limiter is exceeded and has not been grabbed (because throw // RpcThrottlingException in Step 3), so grab it. for (final QuotaLimiter limiter : limiters) { limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed, - writeCapacityUnitConsumed, writeCapacityUnitConsumed, isAtomic); + writeCapacityUnitConsumed, writeCapacityUnitConsumed, isAtomic, 0L); } } } @@ -115,6 +116,9 @@ public class ExceedOperationQuota extends DefaultOperationQuota { if (readDiff != 0) { regionServerLimiter.consumeRead(readDiff, readCapacityUnitDiff, false); } + if (handlerUsageTimeDiff != 0) { + regionServerLimiter.consumeTime(handlerUsageTimeDiff); + } } private interface CheckQuotaRunnable { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java index 6afbebc6e86..2ce17452a32 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java @@ -174,6 +174,11 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings { hasThrottle = true; } break; + case REQUEST_HANDLER_USAGE_MS: + if (throttleBuilder.hasReqHandlerUsageMs()) { + hasThrottle = true; + } + break; default: } return hasThrottle; @@ -236,6 +241,9 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings { case ATOMIC_WRITE_SIZE: throttleBuilder.clearAtomicWriteSize(); break; + case REQUEST_HANDLER_USAGE_MS: + throttleBuilder.clearReqHandlerUsageMs(); + break; default: } boolean hasThrottle = false; @@ -295,6 +303,8 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings { case ATOMIC_WRITE_SIZE: throttleBuilder.setAtomicWriteSize(otherProto.getTimedQuota()); break; + case REQUEST_HANDLER_USAGE_MS: + throttleBuilder.setReqHandlerUsageMs(otherProto.getTimedQuota()); default: } } @@ -388,7 +398,12 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings { case READ_CAPACITY_UNIT: case WRITE_CAPACITY_UNIT: builder.append(String.format("%dCU", timedQuota.getSoftLimit())); + break; + case REQUEST_HANDLER_USAGE_MS: + builder.append(String.format("%dms", timedQuota.getSoftLimit())); + break; default: + // no-op } } else if (timedQuota.hasShare()) { builder.append(String.format("%.2f%%", timedQuota.getShare())); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java index 7c02dbc1134..b75dfbbb7dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java @@ -35,13 +35,14 @@ class NoopQuotaLimiter implements QuotaLimiter { @Override public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit, - boolean isAtomic) throws RpcThrottlingException { + boolean isAtomic, long estimateHandlerThreadUsageMs) throws RpcThrottlingException { // no-op } @Override public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize, - long writeCapacityUnit, long readCapacityUnit, boolean isAtomic) { + long writeCapacityUnit, long readCapacityUnit, boolean isAtomic, + long estimateHandlerThreadUsageMs) { // no-op } @@ -55,6 +56,11 @@ class NoopQuotaLimiter implements QuotaLimiter { // no-op } + @Override + public void consumeTime(final long handlerMillisUsed) { + // no-op + } + @Override public boolean isBypass() { return true; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java index 1b5a1302a20..a98a1ca59b2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java @@ -28,36 +28,48 @@ import org.apache.yetus.audience.InterfaceStability; public interface QuotaLimiter { /** * Checks if it is possible to execute the specified operation. - * @param writeReqs the write requests that will be checked against the available - * quota - * @param estimateWriteSize the write size that will be checked against the available - * quota - * @param readReqs the read requests that will be checked against the available - * quota - * @param estimateReadSize the read size that will be checked against the available quota - * @param estimateWriteCapacityUnit the write capacity unit that will be checked against the - * available quota - * @param estimateReadCapacityUnit the read capacity unit that will be checked against the - * available quota + * @param writeReqs the write requests that will be checked against the + * available quota + * @param estimateWriteSize the write size that will be checked against the available + * quota + * @param readReqs the read requests that will be checked against the + * available quota + * @param estimateReadSize the read size that will be checked against the available + * quota + * @param estimateWriteCapacityUnit the write capacity unit that will be checked against the + * available quota + * @param estimateReadCapacityUnit the read capacity unit that will be checked against the + * available quota + * @param isAtomic if the request performs an atomic operation + * @param estimateHandlerThreadUsageMs the estimated handler usage time in ms that will be checked + * against the available quota * @throws RpcThrottlingException thrown if not enough available resources to perform operation. */ void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize, - long estimateWriteCapacityUnit, long estimateReadCapacityUnit, boolean isAtomic) - throws RpcThrottlingException; + long estimateWriteCapacityUnit, long estimateReadCapacityUnit, boolean isAtomic, + long estimateHandlerThreadUsageMs) throws RpcThrottlingException; /** * Removes the specified write and read amount from the quota. At this point the write and read * amount will be an estimate, that will be later adjusted with a consumeWrite()/consumeRead() * call. - * @param writeReqs the write requests that will be removed from the current quota - * @param writeSize the write size that will be removed from the current quota - * @param readReqs the read requests that will be removed from the current quota - * @param readSize the read size that will be removed from the current quota - * @param writeCapacityUnit the write capacity unit that will be removed from the current quota - * @param readCapacityUnit the read capacity unit num that will be removed from the current quota + * @param writeReqs the write requests that will be removed from the current + * quota + * @param writeSize the write size that will be removed from the current quota + * @param readReqs the read requests that will be removed from the current + * quota + * @param readSize the read size that will be removed from the current quota + * @param writeCapacityUnit the write capacity unit that will be removed from the + * current quota + * @param readCapacityUnit the read capacity unit num that will be removed from the + * current quota + * @param isAtomic if the request performs an atomic operation + * @param estimateHandlerThreadUsageMs the estimated handler usage time in ms that will be removed + * from the available quota */ void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize, - long writeCapacityUnit, long readCapacityUnit, boolean isAtomic); + long writeCapacityUnit, long readCapacityUnit, boolean isAtomic, + long estimateHandlerThreadUsageMs); /** * Removes or add back some write amount to the quota. (called at the end of an operation in case @@ -71,6 +83,13 @@ public interface QuotaLimiter { */ void consumeRead(long size, long capacityUnit, boolean isAtomic); + /** + * Removes or add back some handler thread usage milliseconds to the quota. (called at the end of + * an operation in case the estimate quota was off) + * @param handlerMillisUsed the actual elapsed time used processing the request + */ + void consumeTime(long handlerMillisUsed); + /** Returns true if the limiter is a noop */ boolean isBypass(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java index 222645ca998..68752278383 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java @@ -102,6 +102,8 @@ public class QuotaUtil extends QuotaTableUtil { "hbase.quota.default.user.machine.atomic.request.num"; public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_WRITE_SIZE = "hbase.quota.default.user.machine.atomic.write.size"; + public static final String QUOTA_DEFAULT_USER_MACHINE_REQUEST_HANDLER_USAGE_MS = + "hbase.quota.default.user.machine.request.handler.usage.ms"; /** Table descriptor for Quota internal table */ public static final TableDescriptor QUOTA_TABLE_DESC = @@ -401,6 +403,8 @@ public class QuotaUtil extends QuotaTableUtil { .ifPresent(throttleBuilder::setAtomicReqNum); buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_WRITE_SIZE) .ifPresent(throttleBuilder::setAtomicWriteSize); + buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_REQUEST_HANDLER_USAGE_MS) + .ifPresent(throttleBuilder::setReqHandlerUsageMs); UserQuotaState state = new UserQuotaState(nowTs); QuotaProtos.Quotas defaultQuotas = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java index d847a9eb3dc..7c23666490d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.quotas; import java.io.IOException; import java.util.List; import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.ipc.RpcScheduler; @@ -33,6 +35,8 @@ import org.apache.yetus.audience.InterfaceStability; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.base.Suppliers; + import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; /** @@ -52,11 +56,15 @@ public class RegionServerRpcQuotaManager implements RpcQuotaManager { private volatile boolean rpcThrottleEnabled; // Storage for quota rpc throttle private RpcThrottleStorage rpcThrottleStorage; + private final Supplier<Double> requestsPerSecondSupplier; public RegionServerRpcQuotaManager(final RegionServerServices rsServices) { this.rsServices = rsServices; rpcThrottleStorage = new RpcThrottleStorage(rsServices.getZooKeeper(), rsServices.getConfiguration()); + this.requestsPerSecondSupplier = Suppliers.memoizeWithExpiration( + () -> rsServices.getMetrics().getRegionServerWrapper().getRequestsPerSecond(), 1, + TimeUnit.MINUTES); } public void start(final RpcScheduler rpcScheduler) throws IOException { @@ -119,6 +127,7 @@ public class RegionServerRpcQuotaManager implements RpcQuotaManager { if (isQuotaEnabled() && !table.isSystemTable() && isRpcThrottleEnabled()) { UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi); QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table); + boolean useNoop = userLimiter.isBypass(); if (userQuotaState.hasBypassGlobals()) { if (LOG.isTraceEnabled()) { @@ -126,7 +135,7 @@ public class RegionServerRpcQuotaManager implements RpcQuotaManager { } if (!useNoop) { return new DefaultOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes, - userLimiter); + requestsPerSecondSupplier.get(), userLimiter); } } else { QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString()); @@ -143,10 +152,10 @@ public class RegionServerRpcQuotaManager implements RpcQuotaManager { if (!useNoop) { if (exceedThrottleQuotaEnabled) { return new ExceedOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes, - rsLimiter, userLimiter, tableLimiter, nsLimiter); + requestsPerSecondSupplier.get(), rsLimiter, userLimiter, tableLimiter, nsLimiter); } else { return new DefaultOperationQuota(this.rsServices.getConfiguration(), blockSizeBytes, - userLimiter, tableLimiter, nsLimiter, rsLimiter); + requestsPerSecondSupplier.get(), userLimiter, tableLimiter, nsLimiter, rsLimiter); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java index e62d98242e4..232471092c2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java @@ -45,6 +45,7 @@ public class TimeBasedLimiter implements QuotaLimiter { private RateLimiter atomicReqLimiter = null; private RateLimiter atomicReadSizeLimiter = null; private RateLimiter atomicWriteSizeLimiter = null; + private RateLimiter reqHandlerUsageTimeLimiter = null; private TimeBasedLimiter() { if ( @@ -66,6 +67,7 @@ public class TimeBasedLimiter implements QuotaLimiter { atomicReqLimiter = new FixedIntervalRateLimiter(refillInterval); atomicReadSizeLimiter = new FixedIntervalRateLimiter(refillInterval); atomicWriteSizeLimiter = new FixedIntervalRateLimiter(refillInterval); + reqHandlerUsageTimeLimiter = new FixedIntervalRateLimiter(refillInterval); } else { reqsLimiter = new AverageIntervalRateLimiter(); reqSizeLimiter = new AverageIntervalRateLimiter(); @@ -79,6 +81,7 @@ public class TimeBasedLimiter implements QuotaLimiter { atomicReqLimiter = new AverageIntervalRateLimiter(); atomicReadSizeLimiter = new AverageIntervalRateLimiter(); atomicWriteSizeLimiter = new AverageIntervalRateLimiter(); + reqHandlerUsageTimeLimiter = new AverageIntervalRateLimiter(); } } @@ -145,6 +148,11 @@ public class TimeBasedLimiter implements QuotaLimiter { isBypass = false; } + if (throttle.hasReqHandlerUsageMs()) { + setFromTimedQuota(limiter.reqHandlerUsageTimeLimiter, throttle.getReqHandlerUsageMs()); + isBypass = false; + } + return isBypass ? NoopQuotaLimiter.get() : limiter; } @@ -161,6 +169,7 @@ public class TimeBasedLimiter implements QuotaLimiter { atomicReqLimiter.update(other.atomicReqLimiter); atomicReadSizeLimiter.update(other.atomicReadSizeLimiter); atomicWriteSizeLimiter.update(other.atomicWriteSizeLimiter); + reqHandlerUsageTimeLimiter.update(other.reqHandlerUsageTimeLimiter); } private static void setFromTimedQuota(final RateLimiter limiter, final TimedQuota timedQuota) { @@ -170,7 +179,7 @@ public class TimeBasedLimiter implements QuotaLimiter { @Override public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit, - boolean isAtomic) throws RpcThrottlingException { + boolean isAtomic, long estimatedReqHandlerUsageTimeMs) throws RpcThrottlingException { long waitInterval = reqsLimiter.getWaitIntervalMs(writeReqs + readReqs); if (waitInterval > 0) { RpcThrottlingException.throwNumRequestsExceeded(waitInterval); @@ -232,11 +241,16 @@ public class TimeBasedLimiter implements QuotaLimiter { } } } + waitInterval = reqHandlerUsageTimeLimiter.getWaitIntervalMs(estimatedReqHandlerUsageTimeMs); + if (waitInterval > 0) { + RpcThrottlingException.throwRequestHandlerUsageTimeExceeded(waitInterval); + } } @Override public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize, - long writeCapacityUnit, long readCapacityUnit, boolean isAtomic) { + long writeCapacityUnit, long readCapacityUnit, boolean isAtomic, + long estimateHandlerThreadUsageMs) { assert writeSize != 0 || readSize != 0; reqsLimiter.consume(writeReqs + readReqs); @@ -267,6 +281,7 @@ public class TimeBasedLimiter implements QuotaLimiter { atomicWriteSizeLimiter.consume(writeSize); } } + reqHandlerUsageTimeLimiter.consume(estimateHandlerThreadUsageMs); } @Override @@ -291,6 +306,11 @@ public class TimeBasedLimiter implements QuotaLimiter { } } + @Override + public void consumeTime(final long handlerMillisUsed) { + reqHandlerUsageTimeLimiter.consume(handlerMillisUsed); + } + @Override public boolean isBypass() { return false; @@ -377,6 +397,9 @@ public class TimeBasedLimiter implements QuotaLimiter { if (!atomicWriteSizeLimiter.isBypass()) { builder.append(" atomicWriteSizeLimiter=" + atomicWriteSizeLimiter); } + if (!reqHandlerUsageTimeLimiter.isBypass()) { + builder.append(" reqHandlerUsageTimeLimiter=" + reqHandlerUsageTimeLimiter); + } builder.append(')'); return builder.toString(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultHandlerUsageQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultHandlerUsageQuota.java new file mode 100644 index 00000000000..36fb9b35403 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultHandlerUsageQuota.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestDefaultHandlerUsageQuota { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestDefaultHandlerUsageQuota.class); + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static final TableName TABLE_NAME = TableName.valueOf(UUID.randomUUID().toString()); + private static final int REFRESH_TIME = 5; + private static final byte[] FAMILY = Bytes.toBytes("cf"); + private static final byte[] QUALIFIER = Bytes.toBytes("q"); + + @AfterClass + public static void tearDown() throws Exception { + ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL); + EnvironmentEdgeManager.reset(); + TEST_UTIL.deleteTable(TABLE_NAME); + TEST_UTIL.shutdownMiniCluster(); + } + + @BeforeClass + public static void setUp() throws Exception { + // quotas enabled, using block bytes scanned + TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); + TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, REFRESH_TIME); + // Set default to very strict + TEST_UTIL.getConfiguration() + .setInt(QuotaUtil.QUOTA_DEFAULT_USER_MACHINE_REQUEST_HANDLER_USAGE_MS, 10); + + // don't cache blocks to make IO predictable + TEST_UTIL.getConfiguration().setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); + + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + QuotaCache.TEST_FORCE_REFRESH = true; + TEST_UTIL.flush(TABLE_NAME); + } + + @Test + public void testDefaultHandlerUsageLimits() throws Exception { + // Should have a strict throttle by default + TEST_UTIL.waitFor(60_000, () -> runPutTest(100) < 100); + + // Add big quota and should be effectively unlimited + configureLenientThrottle(); + // Should run without error + TEST_UTIL.waitFor(60_000, () -> runPutTest(100) == 100); + + // Remove all the limits, and should revert to strict default + unsetQuota(); + TEST_UTIL.waitFor(60_000, () -> runPutTest(100) < 100); + } + + private void configureLenientThrottle() throws IOException { + try (Admin admin = TEST_UTIL.getAdmin()) { + admin.setQuota(QuotaSettingsFactory.throttleUser(getUserName(), + ThrottleType.REQUEST_HANDLER_USAGE_MS, 100_000, TimeUnit.SECONDS)); + } + } + + private static String getUserName() throws IOException { + return User.getCurrent().getShortName(); + } + + private void unsetQuota() throws Exception { + try (Admin admin = TEST_UTIL.getAdmin()) { + admin.setQuota(QuotaSettingsFactory.unthrottleUser(getUserName())); + } + } + + private long runPutTest(int attempts) throws Exception { + try (Table table = getTable()) { + return ThrottleQuotaTestUtil.doPuts(attempts, FAMILY, QUALIFIER, table); + } + } + + private Table getTable() throws IOException { + TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 100); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + return TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).setOperationTimeout(250) + .build(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java index beeab8aef5c..c22a03f8db0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java @@ -41,6 +41,7 @@ public class TestDefaultOperationQuota { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestDefaultOperationQuota.class); + private static final int DEFAULT_REQUESTS_PER_SECOND = 1000; private static ManualEnvironmentEdge envEdge = new ManualEnvironmentEdge(); static { envEdge.setValue(EnvironmentEdgeManager.currentTime()); @@ -150,7 +151,8 @@ public class TestDefaultOperationQuota { QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder() .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); - DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); + DefaultOperationQuota quota = + new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter); // use the whole limit quota.checkBatchQuota(0, limit, false); @@ -171,7 +173,8 @@ public class TestDefaultOperationQuota { QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder() .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); - DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); + DefaultOperationQuota quota = + new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter); // use the whole limit quota.checkBatchQuota(limit, 0, false); @@ -192,7 +195,8 @@ public class TestDefaultOperationQuota { QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder() .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); - DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); + DefaultOperationQuota quota = + new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter); // use more than the limit, which should succeed rather than being indefinitely blocked quota.checkBatchQuota(0, 10 + limit, false); @@ -213,7 +217,8 @@ public class TestDefaultOperationQuota { QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder() .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); - DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); + DefaultOperationQuota quota = + new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter); // use more than the limit, which should succeed rather than being indefinitely blocked quota.checkBatchQuota(10 + limit, 0, false); @@ -234,7 +239,8 @@ public class TestDefaultOperationQuota { QuotaProtos.Throttle.newBuilder().setWriteSize(QuotaProtos.TimedQuota.newBuilder() .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); - DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); + DefaultOperationQuota quota = + new DefaultOperationQuota(new Configuration(), 65536, DEFAULT_REQUESTS_PER_SECOND, limiter); // writes are estimated a 100 bytes, so this will use 2x the limit but should not be blocked quota.checkBatchQuota(1, 0, false); @@ -256,8 +262,8 @@ public class TestDefaultOperationQuota { QuotaProtos.Throttle.newBuilder().setReadSize(QuotaProtos.TimedQuota.newBuilder() .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); - DefaultOperationQuota quota = - new DefaultOperationQuota(new Configuration(), (int) blockSize, limiter); + DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), (int) blockSize, + DEFAULT_REQUESTS_PER_SECOND, limiter); // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked quota.checkBatchQuota(0, 1, false); @@ -279,8 +285,8 @@ public class TestDefaultOperationQuota { QuotaProtos.Throttle.newBuilder().setReqSize(QuotaProtos.TimedQuota.newBuilder() .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); - DefaultOperationQuota quota = - new DefaultOperationQuota(new Configuration(), (int) blockSize, limiter); + DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), (int) blockSize, + DEFAULT_REQUESTS_PER_SECOND, limiter); // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked quota.checkBatchQuota(0, 1, false); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java index d64b1002b1e..59b26f3f0d9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java @@ -224,7 +224,7 @@ public class TestQuotaState { assertFalse(quotaInfo.isBypass()); QuotaLimiter limiter = quotaInfo.getTableLimiter(TABLE_A); try { - limiter.checkQuota(TABLE_A_THROTTLE_1 + 1, TABLE_A_THROTTLE_1 + 1, 0, 0, 1, 0, false); + limiter.checkQuota(TABLE_A_THROTTLE_1 + 1, TABLE_A_THROTTLE_1 + 1, 0, 0, 1, 0, false, 0L); fail("Should have thrown RpcThrottlingException"); } catch (RpcThrottlingException e) { // expected @@ -241,7 +241,7 @@ public class TestQuotaState { private void assertThrottleException(final QuotaLimiter limiter, final int availReqs) { assertNoThrottleException(limiter, availReqs); try { - limiter.checkQuota(1, 1, 0, 0, 1, 0, false); + limiter.checkQuota(1, 1, 0, 0, 1, 0, false, 0L); fail("Should have thrown RpcThrottlingException"); } catch (RpcThrottlingException e) { // expected @@ -251,11 +251,11 @@ public class TestQuotaState { private void assertNoThrottleException(final QuotaLimiter limiter, final int availReqs) { for (int i = 0; i < availReqs; ++i) { try { - limiter.checkQuota(1, 1, 0, 0, 1, 0, false); + limiter.checkQuota(1, 1, 0, 0, 1, 0, false, 0L); } catch (RpcThrottlingException e) { fail("Unexpected RpcThrottlingException after " + i + " requests. limit=" + availReqs); } - limiter.grabQuota(1, 1, 0, 0, 1, 0, false); + limiter.grabQuota(1, 1, 0, 0, 1, 0, false, 0L); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestThreadHandlerUsageQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestThreadHandlerUsageQuota.java new file mode 100644 index 00000000000..5c446b6d7c2 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestThreadHandlerUsageQuota.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestThreadHandlerUsageQuota { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestThreadHandlerUsageQuota.class); + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static final TableName TABLE_NAME = TableName.valueOf(UUID.randomUUID().toString()); + private static final int REFRESH_TIME = 5; + private static final byte[] FAMILY = Bytes.toBytes("cf"); + private static final byte[] QUALIFIER = Bytes.toBytes("q"); + private static final int MAX_OPS = 1000; + + @AfterClass + public static void tearDown() throws Exception { + ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL); + EnvironmentEdgeManager.reset(); + TEST_UTIL.deleteTable(TABLE_NAME); + TEST_UTIL.shutdownMiniCluster(); + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // Enable quotas + TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); + TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, REFRESH_TIME); + + // Don't cache blocks to make IO predictable + TEST_UTIL.getConfiguration().setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); + + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); + + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + + TEST_UTIL.waitTableAvailable(TABLE_NAME); + QuotaCache.TEST_FORCE_REFRESH = true; + TEST_UTIL.flush(TABLE_NAME); + } + + @Test + public void testHandlerUsageThrottleForReads() throws Exception { + try (Table table = getTable()) { + unthrottleUser(); + long unthrottledAttempts = ThrottleQuotaTestUtil.doGets(MAX_OPS, FAMILY, QUALIFIER, table); + + configureThrottle(); + long throttledAttempts = ThrottleQuotaTestUtil.doGets(MAX_OPS, FAMILY, QUALIFIER, table); + assertTrue("Throttled attempts should be less than unthrottled attempts", + throttledAttempts < unthrottledAttempts); + } + } + + @Test + public void testHandlerUsageThrottleForWrites() throws Exception { + try (Table table = getTable()) { + unthrottleUser(); + long unthrottledAttempts = ThrottleQuotaTestUtil.doPuts(MAX_OPS, FAMILY, QUALIFIER, table); + + configureThrottle(); + long throttledAttempts = ThrottleQuotaTestUtil.doPuts(MAX_OPS, FAMILY, QUALIFIER, table); + assertTrue("Throttled attempts should be less than unthrottled attempts", + throttledAttempts < unthrottledAttempts); + } + } + + private void configureThrottle() throws IOException { + try (Admin admin = TEST_UTIL.getAdmin()) { + admin.setQuota(QuotaSettingsFactory.throttleUser(getUserName(), + ThrottleType.REQUEST_HANDLER_USAGE_MS, 10000, TimeUnit.SECONDS)); + } + } + + private void unthrottleUser() throws Exception { + try (Admin admin = TEST_UTIL.getAdmin()) { + admin.setQuota(QuotaSettingsFactory.unthrottleUserByThrottleType(getUserName(), + ThrottleType.REQUEST_HANDLER_USAGE_MS)); + } + } + + private static String getUserName() throws IOException { + return User.getCurrent().getShortName(); + } + + private Table getTable() throws IOException { + TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 100); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + return TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).setOperationTimeout(250) + .build(); + } +}
