This is an automated email from the ASF dual-hosted git repository. rmattingly pushed a commit to branch 29229-branch-3 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit f6da0bee922bfc835b648855e6f021b40869e6e8 Author: Ray Mattingly <[email protected]> AuthorDate: Thu Apr 3 10:34:41 2025 -0400 HBASE-29229 Throttles should support specific restrictions for atomic workloads (#6866) Co-authored-by: Ray Mattingly <[email protected]> Signed-off-by: Nick Dimiduk <[email protected]> --- .../hbase/quotas/RpcThrottlingException.java | 30 +++- .../hadoop/hbase/quotas/ThrottleSettings.java | 3 + .../apache/hadoop/hbase/quotas/ThrottleType.java | 9 ++ .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 12 ++ .../src/main/protobuf/server/Quota.proto | 7 + .../hadoop/hbase/quotas/DefaultOperationQuota.java | 23 +-- .../hadoop/hbase/quotas/ExceedOperationQuota.java | 21 +-- .../hbase/quotas/GlobalQuotaSettingsImpl.java | 36 +++++ .../hadoop/hbase/quotas/NoopOperationQuota.java | 3 +- .../hadoop/hbase/quotas/NoopQuotaLimiter.java | 10 +- .../apache/hadoop/hbase/quotas/OperationQuota.java | 2 +- .../org/apache/hadoop/hbase/quotas/QuotaCache.java | 4 + .../apache/hadoop/hbase/quotas/QuotaLimiter.java | 9 +- .../org/apache/hadoop/hbase/quotas/QuotaUtil.java | 12 ++ .../hbase/quotas/RegionServerRpcQuotaManager.java | 17 ++- .../hadoop/hbase/quotas/RpcQuotaManager.java | 2 +- .../hadoop/hbase/quotas/TimeBasedLimiter.java | 80 ++++++++++- .../hbase/regionserver/RegionCoprocessorHost.java | 2 +- .../hadoop/hbase/quotas/TestAtomicReadQuota.java | 152 +++++++++++++++----- .../hbase/quotas/TestDefaultAtomicQuota.java | 160 +++++++++++++++++++++ .../hbase/quotas/TestDefaultOperationQuota.java | 42 +++--- .../hbase/quotas/TestNoopOperationQuota.java | 3 +- .../apache/hadoop/hbase/quotas/TestQuotaAdmin.java | 8 ++ .../apache/hadoop/hbase/quotas/TestQuotaState.java | 8 +- .../hadoop/hbase/quotas/ThrottleQuotaTestUtil.java | 20 ++- .../hbase/regionserver/TestScannerLeaseCount.java | 4 +- 26 files changed, 567 insertions(+), 112 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottlingException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottlingException.java index 2c1f13e94e6..dfa8eacb13b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottlingException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/RpcThrottlingException.java @@ -40,14 +40,17 @@ public class RpcThrottlingException extends HBaseIOException { ReadSizeExceeded, RequestCapacityUnitExceeded, ReadCapacityUnitExceeded, - WriteCapacityUnitExceeded + WriteCapacityUnitExceeded, + AtomicRequestNumberExceeded, + AtomicReadSizeExceeded, + AtomicWriteSizeExceeded, } - private static final String[] MSG_TYPE = - new String[] { "number of requests exceeded", "request size limit exceeded", - "number of read requests exceeded", "number of write requests exceeded", - "write size limit exceeded", "read size limit exceeded", "request capacity unit exceeded", - "read capacity unit exceeded", "write capacity unit exceeded" }; + private static final String[] MSG_TYPE = new String[] { "number of requests exceeded", + "request size limit exceeded", "number of read requests exceeded", + "number of write requests exceeded", "write size limit exceeded", "read size limit exceeded", + "request capacity unit exceeded", "read capacity unit exceeded", "write capacity unit exceeded", + "atomic request number exceeded", "atomic read size exceeded", "atomic write size exceeded" }; private static final String MSG_WAIT = " - wait "; @@ -127,6 +130,21 @@ public class RpcThrottlingException extends HBaseIOException { throwThrottlingException(Type.WriteCapacityUnitExceeded, waitInterval); } + public static void throwAtomicRequestNumberExceeded(final long waitInterval) + throws RpcThrottlingException { + throwThrottlingException(Type.AtomicRequestNumberExceeded, waitInterval); + } + + public static void throwAtomicReadSizeExceeded(final long waitInterval) + throws RpcThrottlingException { + throwThrottlingException(Type.AtomicReadSizeExceeded, waitInterval); + } + + public static void throwAtomicWriteSizeExceeded(final long waitInterval) + throws RpcThrottlingException { + throwThrottlingException(Type.AtomicWriteSizeExceeded, waitInterval); + } + private static void throwThrottlingException(final Type type, final long waitInterval) throws RpcThrottlingException { String msg = MSG_TYPE[type.ordinal()] + MSG_WAIT + stringFromMillis(waitInterval); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java index 01dfc3709ae..efde451c122 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleSettings.java @@ -93,11 +93,14 @@ public class ThrottleSettings extends QuotaSettings { case REQUEST_NUMBER: case WRITE_NUMBER: case READ_NUMBER: + case ATOMIC_REQUEST_NUMBER: builder.append(String.format("%dreq", timedQuota.getSoftLimit())); break; case REQUEST_SIZE: case WRITE_SIZE: case READ_SIZE: + case ATOMIC_READ_SIZE: + case ATOMIC_WRITE_SIZE: builder.append(sizeToString(timedQuota.getSoftLimit())); break; case REQUEST_CAPACITY_UNIT: diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java index 80827dafe6d..2c5a25acc2c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottleType.java @@ -50,4 +50,13 @@ public enum ThrottleType { /** Throttling based on the read data capacity unit */ READ_CAPACITY_UNIT, + + /** Throttling based on the IO footprint of an atomic request */ + ATOMIC_READ_SIZE, + + /** Throttling based on the number of atomic requests per time-unit */ + ATOMIC_REQUEST_NUMBER, + + /** Throttling based on the size of atomic write requests */ + ATOMIC_WRITE_SIZE, } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java index e9eb085ee7b..465d9e95ae8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java @@ -2468,6 +2468,12 @@ public final class ProtobufUtil { return ThrottleType.READ_CAPACITY_UNIT; case WRITE_CAPACITY_UNIT: return ThrottleType.WRITE_CAPACITY_UNIT; + case ATOMIC_READ_SIZE: + return ThrottleType.ATOMIC_READ_SIZE; + case ATOMIC_REQUEST_NUMBER: + return ThrottleType.ATOMIC_REQUEST_NUMBER; + case ATOMIC_WRITE_SIZE: + return ThrottleType.ATOMIC_WRITE_SIZE; default: throw new RuntimeException("Invalid ThrottleType " + proto); } @@ -2497,6 +2503,12 @@ public final class ProtobufUtil { return QuotaProtos.ThrottleType.READ_CAPACITY_UNIT; case WRITE_CAPACITY_UNIT: return QuotaProtos.ThrottleType.WRITE_CAPACITY_UNIT; + case ATOMIC_READ_SIZE: + return QuotaProtos.ThrottleType.ATOMIC_READ_SIZE; + case ATOMIC_REQUEST_NUMBER: + return QuotaProtos.ThrottleType.ATOMIC_REQUEST_NUMBER; + case ATOMIC_WRITE_SIZE: + return QuotaProtos.ThrottleType.ATOMIC_WRITE_SIZE; default: throw new RuntimeException("Invalid ThrottleType " + type); } diff --git a/hbase-protocol-shaded/src/main/protobuf/server/Quota.proto b/hbase-protocol-shaded/src/main/protobuf/server/Quota.proto index 5b00d74980b..e524e015b62 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/Quota.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/Quota.proto @@ -49,6 +49,9 @@ enum ThrottleType { REQUEST_CAPACITY_UNIT = 7; WRITE_CAPACITY_UNIT = 8; READ_CAPACITY_UNIT = 9; + ATOMIC_READ_SIZE = 10; + ATOMIC_REQUEST_NUMBER = 11; + ATOMIC_WRITE_SIZE = 12; } message Throttle { @@ -64,6 +67,10 @@ message Throttle { optional TimedQuota req_capacity_unit = 7; optional TimedQuota write_capacity_unit = 8; optional TimedQuota read_capacity_unit = 9; + + optional TimedQuota atomic_read_size = 10; + optional TimedQuota atomic_req_num = 11; + optional TimedQuota atomic_write_size = 12; } message ThrottleRequest { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index 29c3667fb35..f153eca2e5a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -62,6 +62,7 @@ public class DefaultOperationQuota implements OperationQuota { private boolean useResultSizeBytes; private long blockSizeBytes; private long maxScanEstimate; + private boolean isAtomic = false; public DefaultOperationQuota(final Configuration conf, final int blockSizeBytes, final QuotaLimiter... limiters) { @@ -92,9 +93,10 @@ public class DefaultOperationQuota implements OperationQuota { } @Override - public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException { + public void checkBatchQuota(int numWrites, int numReads, boolean isAtomic) + throws RpcThrottlingException { updateEstimateConsumeBatchQuota(numWrites, numReads); - checkQuota(numWrites, numReads); + checkQuota(numWrites, numReads, isAtomic); } @Override @@ -102,10 +104,15 @@ public class DefaultOperationQuota implements OperationQuota { long maxBlockBytesScanned, long prevBlockBytesScannedDifference) throws RpcThrottlingException { updateEstimateConsumeScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned, prevBlockBytesScannedDifference); - checkQuota(0, 1); + checkQuota(0, 1, false); } - private void checkQuota(long numWrites, long numReads) throws RpcThrottlingException { + private void checkQuota(long numWrites, long numReads, boolean isAtomic) + throws RpcThrottlingException { + if (isAtomic) { + // Remember this flag for later use in close() + this.isAtomic = true; + } readAvailable = Long.MAX_VALUE; for (final QuotaLimiter limiter : limiters) { if (limiter.isBypass()) { @@ -121,13 +128,13 @@ public class DefaultOperationQuota implements OperationQuota { limiter.checkQuota(Math.min(maxWritesToEstimate, numWrites), Math.min(maxWriteSizeToEstimate, writeConsumed), Math.min(maxReadsToEstimate, numReads), Math.min(maxReadSizeToEstimate, readConsumed), writeCapacityUnitConsumed, - readCapacityUnitConsumed); + readCapacityUnitConsumed, isAtomic); readAvailable = Math.min(readAvailable, limiter.getReadAvailable()); } for (final QuotaLimiter limiter : limiters) { limiter.grabQuota(numWrites, writeConsumed, numReads, readConsumed, writeCapacityUnitConsumed, - readCapacityUnitConsumed); + readCapacityUnitConsumed, isAtomic); } } @@ -154,10 +161,10 @@ public class DefaultOperationQuota implements OperationQuota { for (final QuotaLimiter limiter : limiters) { if (writeDiff != 0) { - limiter.consumeWrite(writeDiff, writeCapacityUnitDiff); + limiter.consumeWrite(writeDiff, writeCapacityUnitDiff, isAtomic); } if (readDiff != 0) { - limiter.consumeRead(readDiff, readCapacityUnitDiff); + limiter.consumeRead(readDiff, readCapacityUnitDiff, isAtomic); } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java index 3077d6dac53..7dcfec6b062 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/ExceedOperationQuota.java @@ -49,10 +49,11 @@ public class ExceedOperationQuota extends DefaultOperationQuota { } @Override - public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException { + public void checkBatchQuota(int numWrites, int numReads, boolean isAtomic) + throws RpcThrottlingException { Runnable estimateQuota = () -> updateEstimateConsumeBatchQuota(numWrites, numReads); - CheckQuotaRunnable checkQuota = () -> super.checkBatchQuota(numWrites, numReads); - checkQuota(estimateQuota, checkQuota, numWrites, numReads, 0); + CheckQuotaRunnable checkQuota = () -> super.checkBatchQuota(numWrites, numReads, isAtomic); + checkQuota(estimateQuota, checkQuota, numWrites, numReads, 0, isAtomic); } @Override @@ -62,11 +63,11 @@ public class ExceedOperationQuota extends DefaultOperationQuota { maxBlockBytesScanned, prevBlockBytesScannedDifference); CheckQuotaRunnable checkQuota = () -> super.checkScanQuota(scanRequest, maxScannerResultSize, maxBlockBytesScanned, prevBlockBytesScannedDifference); - checkQuota(estimateQuota, checkQuota, 0, 0, 1); + checkQuota(estimateQuota, checkQuota, 0, 0, 1, false); } private void checkQuota(Runnable estimateQuota, CheckQuotaRunnable checkQuota, int numWrites, - int numReads, int numScans) throws RpcThrottlingException { + int numReads, int numScans, boolean isAtomic) throws RpcThrottlingException { if (regionServerLimiter.isBypass()) { // If region server limiter is bypass, which means no region server quota is set, check and // throttle by all other quotas. In this condition, exceed throttle quota will not work. @@ -77,7 +78,7 @@ public class ExceedOperationQuota extends DefaultOperationQuota { estimateQuota.run(); // 2. Check if region server limiter is enough. If not, throw RpcThrottlingException. regionServerLimiter.checkQuota(numWrites, writeConsumed, numReads + numScans, readConsumed, - writeCapacityUnitConsumed, readCapacityUnitConsumed); + writeCapacityUnitConsumed, readCapacityUnitConsumed, isAtomic); // 3. Check if other limiters are enough. If not, exceed other limiters because region server // limiter is enough. boolean exceed = false; @@ -93,13 +94,13 @@ public class ExceedOperationQuota extends DefaultOperationQuota { // 4. Region server limiter is enough and grab estimated consume quota. readAvailable = Math.max(readAvailable, regionServerLimiter.getReadAvailable()); regionServerLimiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed, - writeCapacityUnitConsumed, writeCapacityUnitConsumed); + writeCapacityUnitConsumed, writeCapacityUnitConsumed, isAtomic); if (exceed) { // 5. Other quota limiter is exceeded and has not been grabbed (because throw // RpcThrottlingException in Step 3), so grab it. for (final QuotaLimiter limiter : limiters) { limiter.grabQuota(numWrites, writeConsumed, numReads + numScans, readConsumed, - writeCapacityUnitConsumed, writeCapacityUnitConsumed); + writeCapacityUnitConsumed, writeCapacityUnitConsumed, isAtomic); } } } @@ -109,10 +110,10 @@ public class ExceedOperationQuota extends DefaultOperationQuota { public void close() { super.close(); if (writeDiff != 0) { - regionServerLimiter.consumeWrite(writeDiff, writeCapacityUnitDiff); + regionServerLimiter.consumeWrite(writeDiff, writeCapacityUnitDiff, false); } if (readDiff != 0) { - regionServerLimiter.consumeRead(readDiff, readCapacityUnitDiff); + regionServerLimiter.consumeRead(readDiff, readCapacityUnitDiff, false); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java index ebde3ed80dc..6afbebc6e86 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/GlobalQuotaSettingsImpl.java @@ -159,6 +159,21 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings { hasThrottle = true; } break; + case ATOMIC_READ_SIZE: + if (throttleBuilder.hasAtomicReadSize()) { + hasThrottle = true; + } + break; + case ATOMIC_REQUEST_NUMBER: + if (throttleBuilder.hasAtomicReqNum()) { + hasThrottle = true; + } + break; + case ATOMIC_WRITE_SIZE: + if (throttleBuilder.hasAtomicWriteSize()) { + hasThrottle = true; + } + break; default: } return hasThrottle; @@ -212,6 +227,15 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings { case WRITE_CAPACITY_UNIT: throttleBuilder.clearWriteCapacityUnit(); break; + case ATOMIC_READ_SIZE: + throttleBuilder.clearAtomicReadSize(); + break; + case ATOMIC_REQUEST_NUMBER: + throttleBuilder.clearAtomicReqNum(); + break; + case ATOMIC_WRITE_SIZE: + throttleBuilder.clearAtomicWriteSize(); + break; default: } boolean hasThrottle = false; @@ -262,6 +286,15 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings { case WRITE_CAPACITY_UNIT: throttleBuilder.setWriteCapacityUnit(otherProto.getTimedQuota()); break; + case ATOMIC_READ_SIZE: + throttleBuilder.setAtomicReadSize(otherProto.getTimedQuota()); + break; + case ATOMIC_REQUEST_NUMBER: + throttleBuilder.setAtomicReqNum(otherProto.getTimedQuota()); + break; + case ATOMIC_WRITE_SIZE: + throttleBuilder.setAtomicWriteSize(otherProto.getTimedQuota()); + break; default: } } @@ -341,11 +374,14 @@ public class GlobalQuotaSettingsImpl extends GlobalQuotaSettings { case REQUEST_NUMBER: case WRITE_NUMBER: case READ_NUMBER: + case ATOMIC_REQUEST_NUMBER: builder.append(String.format("%dreq", timedQuota.getSoftLimit())); break; case REQUEST_SIZE: case WRITE_SIZE: case READ_SIZE: + case ATOMIC_READ_SIZE: + case ATOMIC_WRITE_SIZE: builder.append(sizeToString(timedQuota.getSoftLimit())); break; case REQUEST_CAPACITY_UNIT: diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java index 63cf97188d8..9143e12de00 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopOperationQuota.java @@ -43,7 +43,8 @@ class NoopOperationQuota implements OperationQuota { } @Override - public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException { + public void checkBatchQuota(int numWrites, int numReads, boolean isAtomic) + throws RpcThrottlingException { // no-op } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java index 5ece0be2b5a..7c02dbc1134 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java @@ -34,24 +34,24 @@ class NoopQuotaLimiter implements QuotaLimiter { @Override public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, - long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit) - throws RpcThrottlingException { + long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit, + boolean isAtomic) throws RpcThrottlingException { // no-op } @Override public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize, - long writeCapacityUnit, long readCapacityUnit) { + long writeCapacityUnit, long readCapacityUnit, boolean isAtomic) { // no-op } @Override - public void consumeWrite(final long size, long capacityUnit) { + public void consumeWrite(final long size, long capacityUnit, boolean isAtomic) { // no-op } @Override - public void consumeRead(final long size, long capacityUnit) { + public void consumeRead(final long size, long capacityUnit, boolean isAtomic) { // no-op } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java index 0d9b48b6074..b95a617e127 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/OperationQuota.java @@ -57,7 +57,7 @@ public interface OperationQuota { * @throws RpcThrottlingException if the operation cannot be performed because RPC quota is * exceeded. */ - void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException; + void checkBatchQuota(int numWrites, int numReads, boolean isAtomic) throws RpcThrottlingException; /** * Checks if it is possible to execute the scan. The quota will be estimated based on the diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java index 3999ba643a0..fd9e1993198 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaCache.java @@ -218,6 +218,10 @@ public class QuotaCache implements Stoppable { refreshChore.triggerNow(); } + void forceSynchronousCacheRefresh() { + refreshChore.chore(); + } + long getLastUpdate() { return refreshChore.lastUpdate; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java index 12e4c4a7c6a..1b5a1302a20 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java @@ -42,7 +42,8 @@ public interface QuotaLimiter { * @throws RpcThrottlingException thrown if not enough available resources to perform operation. */ void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, long estimateReadSize, - long estimateWriteCapacityUnit, long estimateReadCapacityUnit) throws RpcThrottlingException; + long estimateWriteCapacityUnit, long estimateReadCapacityUnit, boolean isAtomic) + throws RpcThrottlingException; /** * Removes the specified write and read amount from the quota. At this point the write and read @@ -56,19 +57,19 @@ public interface QuotaLimiter { * @param readCapacityUnit the read capacity unit num that will be removed from the current quota */ void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize, - long writeCapacityUnit, long readCapacityUnit); + long writeCapacityUnit, long readCapacityUnit, boolean isAtomic); /** * Removes or add back some write amount to the quota. (called at the end of an operation in case * the estimate quota was off) */ - void consumeWrite(long size, long capacityUnit); + void consumeWrite(long size, long capacityUnit, boolean isAtomic); /** * Removes or add back some read amount to the quota. (called at the end of an operation in case * the estimate quota was off) */ - void consumeRead(long size, long capacityUnit); + void consumeRead(long size, long capacityUnit, boolean isAtomic); /** Returns true if the limiter is a noop */ boolean isBypass(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java index 8e267d4e8bf..222645ca998 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java @@ -96,6 +96,12 @@ public class QuotaUtil extends QuotaTableUtil { "hbase.quota.default.user.machine.write.num"; public static final String QUOTA_DEFAULT_USER_MACHINE_WRITE_SIZE = "hbase.quota.default.user.machine.write.size"; + public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_READ_SIZE = + "hbase.quota.default.user.machine.atomic.read.size"; + public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_REQUEST_NUM = + "hbase.quota.default.user.machine.atomic.request.num"; + public static final String QUOTA_DEFAULT_USER_MACHINE_ATOMIC_WRITE_SIZE = + "hbase.quota.default.user.machine.atomic.write.size"; /** Table descriptor for Quota internal table */ public static final TableDescriptor QUOTA_TABLE_DESC = @@ -389,6 +395,12 @@ public class QuotaUtil extends QuotaTableUtil { .ifPresent(throttleBuilder::setWriteNum); buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_WRITE_SIZE) .ifPresent(throttleBuilder::setWriteSize); + buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_READ_SIZE) + .ifPresent(throttleBuilder::setAtomicReadSize); + buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_REQUEST_NUM) + .ifPresent(throttleBuilder::setAtomicReqNum); + buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_ATOMIC_WRITE_SIZE) + .ifPresent(throttleBuilder::setAtomicWriteSize); UserQuotaState state = new UserQuotaState(nowTs); QuotaProtos.Quotas defaultQuotas = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java index f9a7ccba401..d847a9eb3dc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerRpcQuotaManager.java @@ -186,11 +186,11 @@ public class RegionServerRpcQuotaManager implements RpcQuotaManager { final OperationQuota.OperationType type) throws IOException, RpcThrottlingException { switch (type) { case GET: - return this.checkBatchQuota(region, 0, 1); + return this.checkBatchQuota(region, 0, 1, false); case MUTATE: - return this.checkBatchQuota(region, 1, 0); + return this.checkBatchQuota(region, 1, 0, false); case CHECK_AND_MUTATE: - return this.checkBatchQuota(region, 1, 1); + return this.checkBatchQuota(region, 1, 1, true); } throw new RuntimeException("Invalid operation type: " + type); } @@ -201,6 +201,7 @@ public class RegionServerRpcQuotaManager implements RpcQuotaManager { throws IOException, RpcThrottlingException { int numWrites = 0; int numReads = 0; + boolean isAtomic = false; for (final ClientProtos.Action action : actions) { if (action.hasMutation()) { numWrites++; @@ -208,12 +209,16 @@ public class RegionServerRpcQuotaManager implements RpcQuotaManager { QuotaUtil.getQuotaOperationType(action, hasCondition); if (operationType == OperationQuota.OperationType.CHECK_AND_MUTATE) { numReads++; + // If any mutations in this batch are atomic, we will count the entire batch as atomic. + // This is a conservative approach, but it is the best that we can do without knowing + // the block bytes scanned of each individual action. + isAtomic = true; } } else if (action.hasGet()) { numReads++; } } - return checkBatchQuota(region, numWrites, numReads); + return checkBatchQuota(region, numWrites, numReads, isAtomic); } /** @@ -227,7 +232,7 @@ public class RegionServerRpcQuotaManager implements RpcQuotaManager { */ @Override public OperationQuota checkBatchQuota(final Region region, final int numWrites, - final int numReads) throws IOException, RpcThrottlingException { + final int numReads, boolean isAtomic) throws IOException, RpcThrottlingException { Optional<User> user = RpcServer.getRequestUser(); UserGroupInformation ugi; if (user.isPresent()) { @@ -240,7 +245,7 @@ public class RegionServerRpcQuotaManager implements RpcQuotaManager { OperationQuota quota = getQuota(ugi, table, region.getMinBlockSizeBytes()); try { - quota.checkBatchQuota(numWrites, numReads); + quota.checkBatchQuota(numWrites, numReads, isAtomic); } catch (RpcThrottlingException e) { LOG.debug("Throttling exception for user=" + ugi.getUserName() + " table=" + table + " numWrites=" + numWrites + " numReads=" + numReads + ": " + e.getMessage()); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RpcQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RpcQuotaManager.java index 60392ca3b3f..3f84f11a7e5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RpcQuotaManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RpcQuotaManager.java @@ -87,6 +87,6 @@ public interface RpcQuotaManager { * @return the OperationQuota * @throws RpcThrottlingException if the operation cannot be executed due to quota exceeded. */ - OperationQuota checkBatchQuota(final Region region, int numWrites, int numReads) + OperationQuota checkBatchQuota(final Region region, int numWrites, int numReads, boolean isAtomic) throws IOException, RpcThrottlingException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java index f5170b09c83..e62d98242e4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java @@ -42,6 +42,9 @@ public class TimeBasedLimiter implements QuotaLimiter { private RateLimiter reqCapacityUnitLimiter = null; private RateLimiter writeCapacityUnitLimiter = null; private RateLimiter readCapacityUnitLimiter = null; + private RateLimiter atomicReqLimiter = null; + private RateLimiter atomicReadSizeLimiter = null; + private RateLimiter atomicWriteSizeLimiter = null; private TimeBasedLimiter() { if ( @@ -60,6 +63,9 @@ public class TimeBasedLimiter implements QuotaLimiter { reqCapacityUnitLimiter = new FixedIntervalRateLimiter(refillInterval); writeCapacityUnitLimiter = new FixedIntervalRateLimiter(refillInterval); readCapacityUnitLimiter = new FixedIntervalRateLimiter(refillInterval); + atomicReqLimiter = new FixedIntervalRateLimiter(refillInterval); + atomicReadSizeLimiter = new FixedIntervalRateLimiter(refillInterval); + atomicWriteSizeLimiter = new FixedIntervalRateLimiter(refillInterval); } else { reqsLimiter = new AverageIntervalRateLimiter(); reqSizeLimiter = new AverageIntervalRateLimiter(); @@ -70,6 +76,9 @@ public class TimeBasedLimiter implements QuotaLimiter { reqCapacityUnitLimiter = new AverageIntervalRateLimiter(); writeCapacityUnitLimiter = new AverageIntervalRateLimiter(); readCapacityUnitLimiter = new AverageIntervalRateLimiter(); + atomicReqLimiter = new AverageIntervalRateLimiter(); + atomicReadSizeLimiter = new AverageIntervalRateLimiter(); + atomicWriteSizeLimiter = new AverageIntervalRateLimiter(); } } @@ -120,6 +129,22 @@ public class TimeBasedLimiter implements QuotaLimiter { setFromTimedQuota(limiter.readCapacityUnitLimiter, throttle.getReadCapacityUnit()); isBypass = false; } + + if (throttle.hasAtomicReqNum()) { + setFromTimedQuota(limiter.atomicReqLimiter, throttle.getAtomicReqNum()); + isBypass = false; + } + + if (throttle.hasAtomicReadSize()) { + setFromTimedQuota(limiter.atomicReadSizeLimiter, throttle.getAtomicReadSize()); + isBypass = false; + } + + if (throttle.hasAtomicWriteSize()) { + setFromTimedQuota(limiter.atomicWriteSizeLimiter, throttle.getAtomicWriteSize()); + isBypass = false; + } + return isBypass ? NoopQuotaLimiter.get() : limiter; } @@ -133,6 +158,9 @@ public class TimeBasedLimiter implements QuotaLimiter { reqCapacityUnitLimiter.update(other.reqCapacityUnitLimiter); writeCapacityUnitLimiter.update(other.writeCapacityUnitLimiter); readCapacityUnitLimiter.update(other.readCapacityUnitLimiter); + atomicReqLimiter.update(other.atomicReqLimiter); + atomicReadSizeLimiter.update(other.atomicReadSizeLimiter); + atomicWriteSizeLimiter.update(other.atomicWriteSizeLimiter); } private static void setFromTimedQuota(final RateLimiter limiter, final TimedQuota timedQuota) { @@ -141,8 +169,8 @@ public class TimeBasedLimiter implements QuotaLimiter { @Override public void checkQuota(long writeReqs, long estimateWriteSize, long readReqs, - long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit) - throws RpcThrottlingException { + long estimateReadSize, long estimateWriteCapacityUnit, long estimateReadCapacityUnit, + boolean isAtomic) throws RpcThrottlingException { long waitInterval = reqsLimiter.getWaitIntervalMs(writeReqs + readReqs); if (waitInterval > 0) { RpcThrottlingException.throwNumRequestsExceeded(waitInterval); @@ -156,6 +184,12 @@ public class TimeBasedLimiter implements QuotaLimiter { if (waitInterval > 0) { RpcThrottlingException.throwRequestCapacityUnitExceeded(waitInterval); } + if (isAtomic) { + waitInterval = atomicReqLimiter.getWaitIntervalMs(writeReqs + readReqs); + if (waitInterval > 0) { + RpcThrottlingException.throwAtomicRequestNumberExceeded(waitInterval); + } + } if (estimateWriteSize > 0) { waitInterval = writeReqsLimiter.getWaitIntervalMs(writeReqs); @@ -170,6 +204,12 @@ public class TimeBasedLimiter implements QuotaLimiter { if (waitInterval > 0) { RpcThrottlingException.throwWriteCapacityUnitExceeded(waitInterval); } + if (isAtomic) { + waitInterval = atomicWriteSizeLimiter.getWaitIntervalMs(writeReqs); + if (waitInterval > 0) { + RpcThrottlingException.throwAtomicWriteSizeExceeded(waitInterval); + } + } } if (estimateReadSize > 0) { @@ -185,12 +225,18 @@ public class TimeBasedLimiter implements QuotaLimiter { if (waitInterval > 0) { RpcThrottlingException.throwReadCapacityUnitExceeded(waitInterval); } + if (isAtomic) { + waitInterval = atomicReadSizeLimiter.getWaitIntervalMs(writeReqs + readReqs); + if (waitInterval > 0) { + RpcThrottlingException.throwAtomicReadSizeExceeded(waitInterval); + } + } } } @Override public void grabQuota(long writeReqs, long writeSize, long readReqs, long readSize, - long writeCapacityUnit, long readCapacityUnit) { + long writeCapacityUnit, long readCapacityUnit, boolean isAtomic) { assert writeSize != 0 || readSize != 0; reqsLimiter.consume(writeReqs + readReqs); @@ -212,22 +258,37 @@ public class TimeBasedLimiter implements QuotaLimiter { reqCapacityUnitLimiter.consume(readCapacityUnit); readCapacityUnitLimiter.consume(readCapacityUnit); } + if (isAtomic) { + atomicReqLimiter.consume(writeReqs + readReqs); + if (readSize > 0) { + atomicReadSizeLimiter.consume(readSize); + } + if (writeSize > 0) { + atomicWriteSizeLimiter.consume(writeSize); + } + } } @Override - public void consumeWrite(final long size, long capacityUnit) { + public void consumeWrite(final long size, long capacityUnit, boolean isAtomic) { reqSizeLimiter.consume(size); writeSizeLimiter.consume(size); reqCapacityUnitLimiter.consume(capacityUnit); writeCapacityUnitLimiter.consume(capacityUnit); + if (isAtomic) { + atomicWriteSizeLimiter.consume(size); + } } @Override - public void consumeRead(final long size, long capacityUnit) { + public void consumeRead(final long size, long capacityUnit, boolean isAtomic) { reqSizeLimiter.consume(size); readSizeLimiter.consume(size); reqCapacityUnitLimiter.consume(capacityUnit); readCapacityUnitLimiter.consume(capacityUnit); + if (isAtomic) { + atomicReadSizeLimiter.consume(size); + } } @Override @@ -307,6 +368,15 @@ public class TimeBasedLimiter implements QuotaLimiter { if (!readCapacityUnitLimiter.isBypass()) { builder.append(" readCapacityUnit=" + readCapacityUnitLimiter); } + if (!atomicReqLimiter.isBypass()) { + builder.append(" atomicReqLimiter=" + atomicReqLimiter); + } + if (!atomicReadSizeLimiter.isBypass()) { + builder.append(" atomicReadSizeLimiter=" + atomicReadSizeLimiter); + } + if (!atomicWriteSizeLimiter.isBypass()) { + builder.append(" atomicWriteSizeLimiter=" + atomicWriteSizeLimiter); + } builder.append(')'); return builder.toString(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 14ea81817a5..703f06141bf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -227,7 +227,7 @@ public class RegionCoprocessorHost @Override public OperationQuota checkBatchQuota(final Region region, int numWrites, int numReads) throws IOException, RpcThrottlingException { - return rpcQuotaManager.checkBatchQuota(region, numWrites, numReads); + return rpcQuotaManager.checkBatchQuota(region, numWrites, numReads, false); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java index 9b654ac8e6d..9a95a6b59c5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestAtomicReadQuota.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.CheckAndMutate; +import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RowMutations; @@ -77,7 +78,7 @@ public class TestAtomicReadQuota { @Test public void testIncrementCountedAgainstReadCapacity() throws Exception { - setupQuota(); + setupGenericQuota(); Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString())); inc.addColumn(FAMILY, QUALIFIER, 1); @@ -86,7 +87,7 @@ public class TestAtomicReadQuota { @Test public void testConditionalRowMutationsCountedAgainstReadCapacity() throws Exception { - setupQuota(); + setupGenericQuota(); byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); Increment inc = new Increment(row); @@ -102,7 +103,7 @@ public class TestAtomicReadQuota { @Test public void testNonConditionalRowMutationsOmittedFromReadCapacity() throws Exception { - setupQuota(); + setupGenericQuota(); byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); Put put = new Put(row); @@ -119,44 +120,19 @@ public class TestAtomicReadQuota { @Test public void testNonAtomicPutOmittedFromReadCapacity() throws Exception { - setupQuota(); - - byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); - Put put = new Put(row); - put.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); - try (Table table = getTable()) { - for (int i = 0; i < 100; i++) { - table.put(put); - } - } + setupGenericQuota(); + runNonAtomicPuts(); } @Test public void testNonAtomicMultiPutOmittedFromReadCapacity() throws Exception { - setupQuota(); - - Put put1 = new Put(Bytes.toBytes(UUID.randomUUID().toString())); - put1.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); - Put put2 = new Put(Bytes.toBytes(UUID.randomUUID().toString())); - put2.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); - - Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString())); - inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); - - List<Put> puts = new ArrayList<>(2); - puts.add(put1); - puts.add(put2); - - try (Table table = getTable()) { - for (int i = 0; i < 100; i++) { - table.put(puts); - } - } + setupGenericQuota(); + runNonAtomicPuts(); } @Test public void testCheckAndMutateCountedAgainstReadCapacity() throws Exception { - setupQuota(); + setupGenericQuota(); byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); byte[] value = Bytes.toBytes("v"); @@ -170,7 +146,26 @@ public class TestAtomicReadQuota { @Test public void testAtomicBatchCountedAgainstReadCapacity() throws Exception { - setupQuota(); + setupGenericQuota(); + + byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); + Increment inc = new Increment(row); + inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); + + List<Increment> incs = new ArrayList<>(2); + incs.add(inc); + incs.add(inc); + + testThrottle(table -> { + Object[] results = new Object[] {}; + table.batch(incs, results); + return results; + }); + } + + @Test + public void testAtomicBatchCountedAgainstAtomicOnlyReqNum() throws Exception { + setupAtomicOnlyReqNumQuota(); byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); Increment inc = new Increment(row); @@ -187,7 +182,78 @@ public class TestAtomicReadQuota { }); } - private void setupQuota() throws Exception { + @Test + public void testAtomicBatchCountedAgainstAtomicOnlyReadSize() throws Exception { + setupAtomicOnlyReadSizeQuota(); + + byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); + Increment inc = new Increment(row); + inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); + + List<Increment> incs = new ArrayList<>(2); + incs.add(inc); + incs.add(inc); + + testThrottle(table -> { + Object[] results = new Object[] {}; + table.batch(incs, results); + return results; + }); + } + + @Test + public void testNonAtomicWritesIgnoredByAtomicOnlyReqNum() throws Exception { + setupAtomicOnlyReqNumQuota(); + runNonAtomicPuts(); + } + + @Test + public void testNonAtomicWritesIgnoredByAtomicOnlyReadSize() throws Exception { + setupAtomicOnlyReadSizeQuota(); + runNonAtomicPuts(); + } + + @Test + public void testNonAtomicReadsIgnoredByAtomicOnlyReqNum() throws Exception { + setupAtomicOnlyReqNumQuota(); + runNonAtomicReads(); + } + + @Test + public void testNonAtomicReadsIgnoredByAtomicOnlyReadSize() throws Exception { + setupAtomicOnlyReadSizeQuota(); + runNonAtomicReads(); + } + + private void runNonAtomicPuts() throws Exception { + Put put1 = new Put(Bytes.toBytes(UUID.randomUUID().toString())); + put1.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); + Put put2 = new Put(Bytes.toBytes(UUID.randomUUID().toString())); + put2.addColumn(FAMILY, Bytes.toBytes("doot"), Bytes.toBytes("v")); + + Increment inc = new Increment(Bytes.toBytes(UUID.randomUUID().toString())); + inc.addColumn(FAMILY, Bytes.toBytes("doot"), 1); + + List<Put> puts = new ArrayList<>(2); + puts.add(put1); + puts.add(put2); + + try (Table table = getTable()) { + for (int i = 0; i < 100; i++) { + table.put(puts); + } + } + } + + private void runNonAtomicReads() throws Exception { + try (Table table = getTable()) { + byte[] row = Bytes.toBytes(UUID.randomUUID().toString()); + Get get = new Get(row); + table.get(get); + } + } + + private void setupGenericQuota() throws Exception { try (Admin admin = TEST_UTIL.getAdmin()) { admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(), ThrottleType.READ_NUMBER, 1, TimeUnit.MINUTES)); @@ -195,6 +261,22 @@ public class TestAtomicReadQuota { ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); } + private void setupAtomicOnlyReqNumQuota() throws Exception { + try (Admin admin = TEST_UTIL.getAdmin()) { + admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(), + ThrottleType.ATOMIC_REQUEST_NUMBER, 1, TimeUnit.MINUTES)); + } + ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + } + + private void setupAtomicOnlyReadSizeQuota() throws Exception { + try (Admin admin = TEST_UTIL.getAdmin()) { + admin.setQuota(QuotaSettingsFactory.throttleUser(User.getCurrent().getShortName(), + ThrottleType.ATOMIC_READ_SIZE, 1, TimeUnit.MINUTES)); + } + ThrottleQuotaTestUtil.triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + } + private void cleanupQuota() throws Exception { try (Admin admin = TEST_UTIL.getAdmin()) { admin.setQuota(QuotaSettingsFactory.unthrottleUser(User.getCurrent().getShortName())); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultAtomicQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultAtomicQuota.java new file mode 100644 index 00000000000..383068259a4 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultAtomicQuota.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.quotas; + +import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.triggerUserCacheRefresh; +import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.waitMinuteQuota; + +import java.io.IOException; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestDefaultAtomicQuota { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestDefaultAtomicQuota.class); + private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private static final TableName TABLE_NAME = TableName.valueOf(UUID.randomUUID().toString()); + private static final int REFRESH_TIME = 5; + private static final byte[] FAMILY = Bytes.toBytes("cf"); + private static final byte[] QUALIFIER = Bytes.toBytes("q"); + + @AfterClass + public static void tearDown() throws Exception { + ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL); + EnvironmentEdgeManager.reset(); + TEST_UTIL.deleteTable(TABLE_NAME); + TEST_UTIL.shutdownMiniCluster(); + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // quotas enabled, using block bytes scanned + TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true); + TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, REFRESH_TIME); + TEST_UTIL.getConfiguration().setInt(QuotaUtil.QUOTA_DEFAULT_USER_MACHINE_ATOMIC_READ_SIZE, 1); + TEST_UTIL.getConfiguration().setInt(QuotaUtil.QUOTA_DEFAULT_USER_MACHINE_ATOMIC_REQUEST_NUM, 1); + TEST_UTIL.getConfiguration().setInt(QuotaUtil.QUOTA_DEFAULT_USER_MACHINE_ATOMIC_WRITE_SIZE, 1); + + // don't cache blocks to make IO predictable + TEST_UTIL.getConfiguration().setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); + + TEST_UTIL.startMiniCluster(1); + TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME); + TEST_UTIL.createTable(TABLE_NAME, FAMILY); + TEST_UTIL.waitTableAvailable(TABLE_NAME); + QuotaCache.TEST_FORCE_REFRESH = true; + TEST_UTIL.flush(TABLE_NAME); + } + + @Test + public void testDefaultAtomicReadLimits() throws Exception { + // No write throttling + configureLenientThrottle(ThrottleType.ATOMIC_WRITE_SIZE); + refreshQuotas(); + + // Should have a strict throttle by default + TEST_UTIL.waitFor(60_000, () -> runIncTest(100) < 100); + + // Add big quota and should be effectively unlimited + configureLenientThrottle(ThrottleType.ATOMIC_READ_SIZE); + configureLenientThrottle(ThrottleType.ATOMIC_REQUEST_NUMBER); + refreshQuotas(); + // Should run without error + TEST_UTIL.waitFor(60_000, () -> runIncTest(100) == 100); + + // Remove all the limits, and should revert to strict default + unsetQuota(); + TEST_UTIL.waitFor(60_000, () -> runIncTest(100) < 100); + } + + @Test + public void testDefaultAtomicWriteLimits() throws Exception { + // No read throttling + configureLenientThrottle(ThrottleType.ATOMIC_REQUEST_NUMBER); + configureLenientThrottle(ThrottleType.ATOMIC_READ_SIZE); + refreshQuotas(); + + // Should have a strict throttle by default + TEST_UTIL.waitFor(60_000, () -> runIncTest(100) < 100); + + // Add big quota and should be effectively unlimited + configureLenientThrottle(ThrottleType.ATOMIC_WRITE_SIZE); + refreshQuotas(); + // Should run without error + TEST_UTIL.waitFor(60_000, () -> runIncTest(100) == 100); + + // Remove all the limits, and should revert to strict default + unsetQuota(); + TEST_UTIL.waitFor(60_000, () -> runIncTest(100) < 100); + } + + private void configureLenientThrottle(ThrottleType throttleType) throws IOException { + try (Admin admin = TEST_UTIL.getAdmin()) { + admin.setQuota( + QuotaSettingsFactory.throttleUser(getUserName(), throttleType, 100_000, TimeUnit.SECONDS)); + } + } + + private static String getUserName() throws IOException { + return User.getCurrent().getShortName(); + } + + private void refreshQuotas() throws Exception { + triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME); + waitMinuteQuota(); + } + + private void unsetQuota() throws Exception { + try (Admin admin = TEST_UTIL.getAdmin()) { + admin.setQuota(QuotaSettingsFactory.unthrottleUser(getUserName())); + } + refreshQuotas(); + } + + private long runIncTest(int attempts) throws Exception { + refreshQuotas(); + try (Table table = getTable()) { + return ThrottleQuotaTestUtil.doIncrements(attempts, FAMILY, QUALIFIER, table); + } + } + + private Table getTable() throws IOException { + TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 100); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); + return TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).setOperationTimeout(250) + .build(); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java index a6b7ba6fee5..beeab8aef5c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java @@ -153,14 +153,14 @@ public class TestDefaultOperationQuota { DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); // use the whole limit - quota.checkBatchQuota(0, limit); + quota.checkBatchQuota(0, limit, false); // the next request should be rejected - assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1)); + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false)); envEdge.incValue(1000); // after the TimeUnit, the limit should be refilled - quota.checkBatchQuota(0, limit); + quota.checkBatchQuota(0, limit, false); } @Test @@ -174,14 +174,14 @@ public class TestDefaultOperationQuota { DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); // use the whole limit - quota.checkBatchQuota(limit, 0); + quota.checkBatchQuota(limit, 0, false); // the next request should be rejected - assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0)); + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0, false)); envEdge.incValue(1000); // after the TimeUnit, the limit should be refilled - quota.checkBatchQuota(limit, 0); + quota.checkBatchQuota(limit, 0, false); } @Test @@ -195,14 +195,14 @@ public class TestDefaultOperationQuota { DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); // use more than the limit, which should succeed rather than being indefinitely blocked - quota.checkBatchQuota(0, 10 + limit); + quota.checkBatchQuota(0, 10 + limit, false); // the next request should be blocked - assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1)); + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false)); envEdge.incValue(1000); // even after the TimeUnit, the limit should not be refilled because we oversubscribed - assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, limit)); + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, limit, false)); } @Test @@ -216,14 +216,14 @@ public class TestDefaultOperationQuota { DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); // use more than the limit, which should succeed rather than being indefinitely blocked - quota.checkBatchQuota(10 + limit, 0); + quota.checkBatchQuota(10 + limit, 0, false); // the next request should be blocked - assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0)); + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0, false)); envEdge.incValue(1000); // even after the TimeUnit, the limit should not be refilled because we oversubscribed - assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0)); + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0, false)); } @Test @@ -237,14 +237,14 @@ public class TestDefaultOperationQuota { DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); // writes are estimated a 100 bytes, so this will use 2x the limit but should not be blocked - quota.checkBatchQuota(1, 0); + quota.checkBatchQuota(1, 0, false); // the next request should be blocked - assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0)); + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0, false)); envEdge.incValue(1000); // even after the TimeUnit, the limit should not be refilled because we oversubscribed - assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0)); + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0, false)); } @Test @@ -260,14 +260,14 @@ public class TestDefaultOperationQuota { new DefaultOperationQuota(new Configuration(), (int) blockSize, limiter); // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked - quota.checkBatchQuota(0, 1); + quota.checkBatchQuota(0, 1, false); // the next request should be blocked - assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1)); + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false)); envEdge.incValue(1000); // even after the TimeUnit, the limit should not be refilled because we oversubscribed - assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1)); + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1, false)); } @Test @@ -283,13 +283,13 @@ public class TestDefaultOperationQuota { new DefaultOperationQuota(new Configuration(), (int) blockSize, limiter); // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked - quota.checkBatchQuota(0, 1); + quota.checkBatchQuota(0, 1, false); // the next request should be blocked - assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1)); + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1, false)); envEdge.incValue(1000); // even after the TimeUnit, the limit should not be refilled because we oversubscribed - assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1)); + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1, false)); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNoopOperationQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNoopOperationQuota.java index ad2b79075a3..7fd686de94b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNoopOperationQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestNoopOperationQuota.java @@ -28,7 +28,8 @@ public class TestNoopOperationQuota implements OperationQuota { public static final TestNoopOperationQuota INSTANCE = new TestNoopOperationQuota(); @Override - public void checkBatchQuota(int numWrites, int numReads) throws RpcThrottlingException { + public void checkBatchQuota(int numWrites, int numReads, boolean isAtomic) + throws RpcThrottlingException { } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java index cd266fa0baa..cf15cef292c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java @@ -773,6 +773,14 @@ public class TestQuotaAdmin { assertTrue(rpcQuota.hasWriteCapacityUnit()); t = rpcQuota.getWriteCapacityUnit(); break; + case ATOMIC_READ_SIZE: + assertTrue(rpcQuota.hasAtomicReadSize()); + t = rpcQuota.getAtomicReadSize(); + break; + case ATOMIC_REQUEST_NUMBER: + assertTrue(rpcQuota.hasAtomicReqNum()); + t = rpcQuota.getAtomicReqNum(); + break; default: } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java index cbd40f7bd81..d64b1002b1e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaState.java @@ -224,7 +224,7 @@ public class TestQuotaState { assertFalse(quotaInfo.isBypass()); QuotaLimiter limiter = quotaInfo.getTableLimiter(TABLE_A); try { - limiter.checkQuota(TABLE_A_THROTTLE_1 + 1, TABLE_A_THROTTLE_1 + 1, 0, 0, 1, 0); + limiter.checkQuota(TABLE_A_THROTTLE_1 + 1, TABLE_A_THROTTLE_1 + 1, 0, 0, 1, 0, false); fail("Should have thrown RpcThrottlingException"); } catch (RpcThrottlingException e) { // expected @@ -241,7 +241,7 @@ public class TestQuotaState { private void assertThrottleException(final QuotaLimiter limiter, final int availReqs) { assertNoThrottleException(limiter, availReqs); try { - limiter.checkQuota(1, 1, 0, 0, 1, 0); + limiter.checkQuota(1, 1, 0, 0, 1, 0, false); fail("Should have thrown RpcThrottlingException"); } catch (RpcThrottlingException e) { // expected @@ -251,11 +251,11 @@ public class TestQuotaState { private void assertNoThrottleException(final QuotaLimiter limiter, final int availReqs) { for (int i = 0; i < availReqs; ++i) { try { - limiter.checkQuota(1, 1, 0, 0, 1, 0); + limiter.checkQuota(1, 1, 0, 0, 1, 0, false); } catch (RpcThrottlingException e) { fail("Unexpected RpcThrottlingException after " + i + " requests. limit=" + availReqs); } - limiter.grabQuota(1, 1, 0, 0, 1, 0); + limiter.grabQuota(1, 1, 0, 0, 1, 0, false); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java index 8da2989921a..ad097b5e07d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/ThrottleQuotaTestUtil.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Increment; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -129,6 +130,23 @@ public final class ThrottleQuotaTestUtil { return count; } + static long doIncrements(int maxOps, byte[] family, byte[] qualifier, final Table... tables) { + int count = 0; + try { + while (count < maxOps) { + Increment inc = new Increment(Bytes.toBytes("row-" + count)); + inc.addColumn(family, qualifier, 1L); + for (final Table table : tables) { + table.increment(inc); + } + count += tables.length; + } + } catch (IOException e) { + LOG.error("increment failed after nRetries=" + count, e); + } + return count; + } + static long doMultiGets(int maxOps, int batchSize, int rowCount, byte[] family, byte[] qualifier, final Table... tables) { int opCount = 0; @@ -202,7 +220,7 @@ public final class ThrottleQuotaTestUtil { RegionServerRpcQuotaManager quotaManager = rst.getRegionServer().getRegionServerRpcQuotaManager(); QuotaCache quotaCache = quotaManager.getQuotaCache(); - quotaCache.triggerCacheRefresh(); + quotaCache.forceSynchronousCacheRefresh(); Thread.sleep(250); testUtil.waitFor(60000, 250, new ExplainingPredicate<Exception>() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerLeaseCount.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerLeaseCount.java index acdd2101b95..307c5733be2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerLeaseCount.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerLeaseCount.java @@ -182,8 +182,8 @@ public class TestScannerLeaseCount { } @Override - public OperationQuota checkBatchQuota(Region region, int numWrites, int numReads) - throws IOException, RpcThrottlingException { + public OperationQuota checkBatchQuota(Region region, int numWrites, int numReads, + boolean isAtomic) throws IOException, RpcThrottlingException { if (SHOULD_THROW) { throw EX; }
