This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch fix-broker-metrics in repository https://gitbox.apache.org/repos/asf/pinot.git
commit ca7004a14bf049bcd8bb58a7f52bd80bbfab778c Author: jlli_LinkedIn <j...@linkedin.com> AuthorDate: Mon Apr 1 11:14:30 2024 -0700 Fetch query quota capacity utilization rate metric in a callback function --- .../HelixExternalViewBasedQueryQuotaManager.java | 30 +++++++++++++++++----- .../apache/pinot/broker/queryquota/HitCounter.java | 12 ++++++++- .../pinot/broker/queryquota/MaxHitRateTracker.java | 12 +++++++++ .../broker/queryquota/MaxHitRateTrackerTest.java | 5 ++++ 4 files changed, 51 insertions(+), 8 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java index 04db0f6a42..dabb95867b 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java @@ -224,6 +224,7 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan tableNameWithType, overallRate, previousRate, perBrokerRate, onlineCount, stat.getVersion()); } addMaxBurstQPSCallbackTableGaugeIfNeeded(tableNameWithType, queryQuotaEntity); + addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(tableNameWithType, queryQuotaEntity); if (isQueryRateLimitDisabled()) { LOGGER.info("Query rate limiting is currently disabled for this broker. So it won't take effect immediately."); } @@ -245,6 +246,7 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan queryQuotaEntity.setRateLimiter(null); } addMaxBurstQPSCallbackTableGaugeIfNeeded(tableNameWithType, queryQuotaEntity); + addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(tableNameWithType, queryQuotaEntity); } /** @@ -256,6 +258,27 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan () -> (long) finalQueryQuotaEntity.getMaxQpsTracker().getMaxCountPerBucket()); } + /** + * Add the query quota capacity utilization rate table gauge to the metric system if the qps quota is specified. + */ + private void addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(String tableNameWithType, + QueryQuotaEntity queryQuotaEntity) { + if (queryQuotaEntity.getRateLimiter() != null) { + final QueryQuotaEntity finalQueryQuotaEntity = queryQuotaEntity; + _brokerMetrics.setOrUpdateTableGauge(tableNameWithType, BrokerGauge.QUERY_QUOTA_CAPACITY_UTILIZATION_RATE, () -> { + double perBrokerRate = finalQueryQuotaEntity.getRateLimiter().getRate(); + int actualHitCountWithinTimeRange = finalQueryQuotaEntity.getMaxQpsTracker().getHitCount(); + long hitCountAllowedWithinTimeRage = + (long) (perBrokerRate * finalQueryQuotaEntity.getMaxQpsTracker().getDefaultTimeRangeMs() / 1000L); + // Since the MaxQpsTracker specifies 1-min window as valid time range, we can get the query quota capacity + // utilization by using the actual hit count within 1 min divided by the expected hit count within 1 min. + long percentageOfCapacityUtilization = actualHitCountWithinTimeRange * 100L / hitCountAllowedWithinTimeRage; + LOGGER.debug("The percentage of rate limit capacity utilization is {}", percentageOfCapacityUtilization); + return percentageOfCapacityUtilization; + }); + } + } + /** * {@inheritDoc} * <p>Acquires a token from rate limiter based on the table name. @@ -316,13 +339,6 @@ public class HelixExternalViewBasedQueryQuotaManager implements ClusterChangeHan // Emit the qps capacity utilization rate. int numHits = queryQuotaEntity.getQpsTracker().getHitCount(); - if (_brokerMetrics != null) { - int percentageOfCapacityUtilization = (int) (numHits * 100 / perBrokerRate); - LOGGER.debug("The percentage of rate limit capacity utilization is {}", percentageOfCapacityUtilization); - _brokerMetrics.setValueOfTableGauge(tableNameWithType, BrokerGauge.QUERY_QUOTA_CAPACITY_UTILIZATION_RATE, - percentageOfCapacityUtilization); - } - if (!rateLimiter.tryAcquire()) { LOGGER.info("Quota is exceeded for table: {}. Per-broker rate: {}. Current qps: {}", tableNameWithType, perBrokerRate, numHits); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HitCounter.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HitCounter.java index eedc53903d..b656c02344 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HitCounter.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HitCounter.java @@ -83,10 +83,20 @@ public class HitCounter { @VisibleForTesting int getHitCount(long timestamp) { + return getHitCount(timestamp, _bucketCount); + } + + /** + * Get the hit count within the valid number of buckets. + * @param timestamp the current timestamp + * @param validBucketCount the valid number of buckets + * @return the number of hits within the valid bucket count + */ + int getHitCount(long timestamp, int validBucketCount) { long numTimeUnits = timestamp / _timeBucketWidthMs; int count = 0; for (int i = 0; i < _bucketCount; i++) { - if (numTimeUnits - _bucketStartTime.get(i) < _bucketCount) { + if (numTimeUnits - _bucketStartTime.get(i) < validBucketCount) { count += _bucketHitCount.get(i); } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/MaxHitRateTracker.java b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/MaxHitRateTracker.java index bdb8dbc214..b0cbd88b0b 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/MaxHitRateTracker.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/MaxHitRateTracker.java @@ -34,6 +34,7 @@ public class MaxHitRateTracker extends HitCounter { private static final int ONE_SECOND_BUCKET_WIDTH_MS = 1000; private static final int MAX_TIME_RANGE_FACTOR = 2; + private final int _validBucketCount; private final long _maxTimeRangeMs; private final long _defaultTimeRangeMs; private volatile long _lastAccessTimestamp; @@ -44,6 +45,7 @@ public class MaxHitRateTracker extends HitCounter { private MaxHitRateTracker(int defaultTimeRangeInSeconds, int maxTimeRangeInSeconds) { super(maxTimeRangeInSeconds, (int) (maxTimeRangeInSeconds * 1000L / ONE_SECOND_BUCKET_WIDTH_MS)); + _validBucketCount = (int) (defaultTimeRangeInSeconds * 1000L / ONE_SECOND_BUCKET_WIDTH_MS); _defaultTimeRangeMs = defaultTimeRangeInSeconds * 1000L; _maxTimeRangeMs = maxTimeRangeInSeconds * 1000L; } @@ -80,4 +82,14 @@ public class MaxHitRateTracker extends HitCounter { _lastAccessTimestamp = now; return maxCount; } + + @VisibleForTesting + @Override + int getHitCount(long now) { + return super.getHitCount(now, _validBucketCount); + } + + public long getDefaultTimeRangeMs() { + return _defaultTimeRangeMs; + } } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/MaxHitRateTrackerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/MaxHitRateTrackerTest.java index b4d9cc5f93..f19c55504d 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/MaxHitRateTrackerTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/MaxHitRateTrackerTest.java @@ -37,24 +37,29 @@ public class MaxHitRateTrackerTest { long latestTimeStamp = currentTimestamp + (timeInSec - 1) * 1000; Assert.assertNotNull(hitCounter); Assert.assertEquals(5, hitCounter.getMaxCountPerBucket(latestTimeStamp)); + Assert.assertEquals(5 * 60, hitCounter.getHitCount(latestTimeStamp)); // 2 seconds have passed, the hit counter should return 5 as well since the count in the last bucket could increase. latestTimeStamp = latestTimeStamp + 2000L; Assert.assertEquals(5, hitCounter.getMaxCountPerBucket(latestTimeStamp)); + Assert.assertEquals(5 * (60 - 2), hitCounter.getHitCount(latestTimeStamp)); // This time it should return 0 as the internal lastAccessTimestamp has already been updated and there is no more // hits between the gap. latestTimeStamp = latestTimeStamp + 2000L; Assert.assertEquals(0, hitCounter.getMaxCountPerBucket(latestTimeStamp)); + Assert.assertEquals(5 * (60 - 4), hitCounter.getHitCount(latestTimeStamp)); // Increment the hit in this second and we should see the result becomes 1. hitCounter.hit(latestTimeStamp); latestTimeStamp = latestTimeStamp + 2000L; Assert.assertEquals(1, hitCounter.getMaxCountPerBucket(latestTimeStamp)); + Assert.assertEquals(5 * (60 - 6) + 1, hitCounter.getHitCount(latestTimeStamp)); // More than a time range period has passed and the hit counter should return 0 as there is no hits. hitCounter.hit(latestTimeStamp); latestTimeStamp = latestTimeStamp + timeInSec * 2 * 1000L + 2000L; Assert.assertEquals(0, hitCounter.getMaxCountPerBucket(latestTimeStamp)); + Assert.assertEquals(0, hitCounter.getHitCount(latestTimeStamp)); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org