This is an automated email from the ASF dual-hosted git repository. ndimiduk pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.6 by this push: new 794bf1ee97d HBASE-28672 Ensure large batches are not indefinitely blocked by quotas (#6003) 794bf1ee97d is described below commit 794bf1ee97d58283481967a93c35cf528e457178 Author: Ray Mattingly <rmdmattin...@gmail.com> AuthorDate: Mon Jul 8 19:28:44 2024 -0400 HBASE-28672 Ensure large batches are not indefinitely blocked by quotas (#6003) Co-authored-by: Ray Mattingly <rmattin...@hubspot.com> Signed-off-by: Bryan Beaudreault < bbeaudrea...@apache.org> Signed-off-by: Nick Dimiduk <ndimi...@apache.org> --- .../hadoop/hbase/quotas/DefaultOperationQuota.java | 12 +- .../hadoop/hbase/quotas/NoopQuotaLimiter.java | 20 +++ .../apache/hadoop/hbase/quotas/QuotaLimiter.java | 13 ++ .../hadoop/hbase/quotas/TimeBasedLimiter.java | 26 ++++ .../hbase/quotas/TestDefaultOperationQuota.java | 167 +++++++++++++++++++++ 5 files changed, 236 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java index 2e26765a6a1..a387c04e4e5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/DefaultOperationQuota.java @@ -111,8 +111,16 @@ public class DefaultOperationQuota implements OperationQuota { continue; } - limiter.checkQuota(numWrites, writeConsumed, numReads, readConsumed, - writeCapacityUnitConsumed, readCapacityUnitConsumed); + long maxRequestsToEstimate = limiter.getRequestNumLimit(); + long maxReadsToEstimate = Math.min(maxRequestsToEstimate, limiter.getReadNumLimit()); + long maxWritesToEstimate = Math.min(maxRequestsToEstimate, limiter.getWriteNumLimit()); + long maxReadSizeToEstimate = Math.min(readConsumed, limiter.getReadLimit()); + long maxWriteSizeToEstimate = Math.min(writeConsumed, limiter.getWriteLimit()); + + limiter.checkQuota(Math.min(maxWritesToEstimate, numWrites), + Math.min(maxWriteSizeToEstimate, writeConsumed), Math.min(maxReadsToEstimate, numReads), + Math.min(maxReadSizeToEstimate, readConsumed), writeCapacityUnitConsumed, + readCapacityUnitConsumed); readAvailable = Math.min(readAvailable, limiter.getReadAvailable()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java index cf1e49c12e5..5ece0be2b5a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/NoopQuotaLimiter.java @@ -65,6 +65,21 @@ class NoopQuotaLimiter implements QuotaLimiter { throw new UnsupportedOperationException(); } + @Override + public long getRequestNumLimit() { + return Long.MAX_VALUE; + } + + @Override + public long getReadNumLimit() { + return Long.MAX_VALUE; + } + + @Override + public long getWriteNumLimit() { + return Long.MAX_VALUE; + } + @Override public long getReadAvailable() { throw new UnsupportedOperationException(); @@ -75,6 +90,11 @@ class NoopQuotaLimiter implements QuotaLimiter { return Long.MAX_VALUE; } + @Override + public long getWriteLimit() { + return Long.MAX_VALUE; + } + @Override public String toString() { return "NoopQuotaLimiter"; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java index 8d00a702e25..12e4c4a7c6a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaLimiter.java @@ -79,6 +79,19 @@ public interface QuotaLimiter { /** Returns the maximum number of bytes ever available to read */ long getReadLimit(); + /** Returns the maximum number of bytes ever available to write */ + long getWriteLimit(); + /** Returns the number of bytes available to write to avoid exceeding the quota */ long getWriteAvailable(); + + /** Returns the maximum number of requests to allow per TimeUnit */ + long getRequestNumLimit(); + + /** Returns the maximum number of reads to allow per TimeUnit */ + long getReadNumLimit(); + + /** Returns the maximum number of writes to allow per TimeUnit */ + long getWriteNumLimit(); + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java index e6e143343f7..f5170b09c83 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java @@ -240,6 +240,27 @@ public class TimeBasedLimiter implements QuotaLimiter { return writeSizeLimiter.getAvailable(); } + @Override + public long getRequestNumLimit() { + long readAndWriteLimit = readReqsLimiter.getLimit() + writeReqsLimiter.getLimit(); + + if (readAndWriteLimit < 0) { // handle overflow + readAndWriteLimit = Long.MAX_VALUE; + } + + return Math.min(reqsLimiter.getLimit(), readAndWriteLimit); + } + + @Override + public long getReadNumLimit() { + return readReqsLimiter.getLimit(); + } + + @Override + public long getWriteNumLimit() { + return writeReqsLimiter.getLimit(); + } + @Override public long getReadAvailable() { return readSizeLimiter.getAvailable(); @@ -250,6 +271,11 @@ public class TimeBasedLimiter implements QuotaLimiter { return Math.min(readSizeLimiter.getLimit(), reqSizeLimiter.getLimit()); } + @Override + public long getWriteLimit() { + return Math.min(writeSizeLimiter.getLimit(), reqSizeLimiter.getLimit()); + } + @Override public String toString() { StringBuilder builder = new StringBuilder(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java index 4684be02d69..a6b7ba6fee5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestDefaultOperationQuota.java @@ -18,21 +18,37 @@ package org.apache.hadoop.hbase.quotas; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos; + @Category({ RegionServerTests.class, SmallTests.class }) public class TestDefaultOperationQuota { @ClassRule public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestDefaultOperationQuota.class); + private static ManualEnvironmentEdge envEdge = new ManualEnvironmentEdge(); + static { + envEdge.setValue(EnvironmentEdgeManager.currentTime()); + // only active the envEdge for quotas package + EnvironmentEdgeManagerTestHelper.injectEdgeForPackage(envEdge, + ThrottleQuotaTestUtil.class.getPackage().getName()); + } + @Test public void testScanEstimateNewScanner() { long blockSize = 64 * 1024; @@ -125,4 +141,155 @@ public class TestDefaultOperationQuota { // shrinking workload should only shrink estimate to maxBBS assertEquals(maxBlockBytesScanned, estimate); } + + @Test + public void testLargeBatchSaturatesReadNumLimit() + throws RpcThrottlingException, InterruptedException { + int limit = 10; + QuotaProtos.Throttle throttle = + QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder() + .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); + QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); + DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); + + // use the whole limit + quota.checkBatchQuota(0, limit); + + // the next request should be rejected + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1)); + + envEdge.incValue(1000); + // after the TimeUnit, the limit should be refilled + quota.checkBatchQuota(0, limit); + } + + @Test + public void testLargeBatchSaturatesReadWriteLimit() + throws RpcThrottlingException, InterruptedException { + int limit = 10; + QuotaProtos.Throttle throttle = + QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder() + .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); + QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); + DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); + + // use the whole limit + quota.checkBatchQuota(limit, 0); + + // the next request should be rejected + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0)); + + envEdge.incValue(1000); + // after the TimeUnit, the limit should be refilled + quota.checkBatchQuota(limit, 0); + } + + @Test + public void testTooLargeReadBatchIsNotBlocked() + throws RpcThrottlingException, InterruptedException { + int limit = 10; + QuotaProtos.Throttle throttle = + QuotaProtos.Throttle.newBuilder().setReadNum(QuotaProtos.TimedQuota.newBuilder() + .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); + QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); + DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); + + // use more than the limit, which should succeed rather than being indefinitely blocked + quota.checkBatchQuota(0, 10 + limit); + + // the next request should be blocked + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1)); + + envEdge.incValue(1000); + // even after the TimeUnit, the limit should not be refilled because we oversubscribed + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, limit)); + } + + @Test + public void testTooLargeWriteBatchIsNotBlocked() + throws RpcThrottlingException, InterruptedException { + int limit = 10; + QuotaProtos.Throttle throttle = + QuotaProtos.Throttle.newBuilder().setWriteNum(QuotaProtos.TimedQuota.newBuilder() + .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); + QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); + DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); + + // use more than the limit, which should succeed rather than being indefinitely blocked + quota.checkBatchQuota(10 + limit, 0); + + // the next request should be blocked + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0)); + + envEdge.incValue(1000); + // even after the TimeUnit, the limit should not be refilled because we oversubscribed + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0)); + } + + @Test + public void testTooLargeWriteSizeIsNotBlocked() + throws RpcThrottlingException, InterruptedException { + int limit = 50; + QuotaProtos.Throttle throttle = + QuotaProtos.Throttle.newBuilder().setWriteSize(QuotaProtos.TimedQuota.newBuilder() + .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); + QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); + DefaultOperationQuota quota = new DefaultOperationQuota(new Configuration(), 65536, limiter); + + // writes are estimated a 100 bytes, so this will use 2x the limit but should not be blocked + quota.checkBatchQuota(1, 0); + + // the next request should be blocked + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(1, 0)); + + envEdge.incValue(1000); + // even after the TimeUnit, the limit should not be refilled because we oversubscribed + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(limit, 0)); + } + + @Test + public void testTooLargeReadSizeIsNotBlocked() + throws RpcThrottlingException, InterruptedException { + long blockSize = 65536; + long limit = blockSize / 2; + QuotaProtos.Throttle throttle = + QuotaProtos.Throttle.newBuilder().setReadSize(QuotaProtos.TimedQuota.newBuilder() + .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); + QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); + DefaultOperationQuota quota = + new DefaultOperationQuota(new Configuration(), (int) blockSize, limiter); + + // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked + quota.checkBatchQuota(0, 1); + + // the next request should be blocked + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1)); + + envEdge.incValue(1000); + // even after the TimeUnit, the limit should not be refilled because we oversubscribed + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1)); + } + + @Test + public void testTooLargeRequestSizeIsNotBlocked() + throws RpcThrottlingException, InterruptedException { + long blockSize = 65536; + long limit = blockSize / 2; + QuotaProtos.Throttle throttle = + QuotaProtos.Throttle.newBuilder().setReqSize(QuotaProtos.TimedQuota.newBuilder() + .setSoftLimit(limit).setTimeUnit(HBaseProtos.TimeUnit.SECONDS).build()).build(); + QuotaLimiter limiter = TimeBasedLimiter.fromThrottle(throttle); + DefaultOperationQuota quota = + new DefaultOperationQuota(new Configuration(), (int) blockSize, limiter); + + // reads are estimated at 1 block each, so this will use ~2x the limit but should not be blocked + quota.checkBatchQuota(0, 1); + + // the next request should be blocked + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota(0, 1)); + + envEdge.incValue(1000); + // even after the TimeUnit, the limit should not be refilled because we oversubscribed + assertThrows(RpcThrottlingException.class, () -> quota.checkBatchQuota((int) limit, 1)); + } }