This is an automated email from the ASF dual-hosted git repository.

jlli pushed a commit to branch fix-broker-metrics
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit ca7004a14bf049bcd8bb58a7f52bd80bbfab778c
Author: jlli_LinkedIn <j...@linkedin.com>
AuthorDate: Mon Apr 1 11:14:30 2024 -0700

    Fetch query quota capacity utilization rate metric in a callback function
---
 .../HelixExternalViewBasedQueryQuotaManager.java   | 30 +++++++++++++++++-----
 .../apache/pinot/broker/queryquota/HitCounter.java | 12 ++++++++-
 .../pinot/broker/queryquota/MaxHitRateTracker.java | 12 +++++++++
 .../broker/queryquota/MaxHitRateTrackerTest.java   |  5 ++++
 4 files changed, 51 insertions(+), 8 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
index 04db0f6a42..dabb95867b 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HelixExternalViewBasedQueryQuotaManager.java
@@ -224,6 +224,7 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
           tableNameWithType, overallRate, previousRate, perBrokerRate, 
onlineCount, stat.getVersion());
     }
     addMaxBurstQPSCallbackTableGaugeIfNeeded(tableNameWithType, 
queryQuotaEntity);
+    addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(tableNameWithType, 
queryQuotaEntity);
     if (isQueryRateLimitDisabled()) {
       LOGGER.info("Query rate limiting is currently disabled for this broker. 
So it won't take effect immediately.");
     }
@@ -245,6 +246,7 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
       queryQuotaEntity.setRateLimiter(null);
     }
     addMaxBurstQPSCallbackTableGaugeIfNeeded(tableNameWithType, 
queryQuotaEntity);
+    addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(tableNameWithType, 
queryQuotaEntity);
   }
 
   /**
@@ -256,6 +258,27 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
         () -> (long) 
finalQueryQuotaEntity.getMaxQpsTracker().getMaxCountPerBucket());
   }
 
+  /**
+   * Add the query quota capacity utilization rate table gauge to the metric 
system if the qps quota is specified.
+   */
+  private void addQueryQuotaCapacityUtilizationRateTableGaugeIfNeeded(String 
tableNameWithType,
+      QueryQuotaEntity queryQuotaEntity) {
+    if (queryQuotaEntity.getRateLimiter() != null) {
+      final QueryQuotaEntity finalQueryQuotaEntity = queryQuotaEntity;
+      _brokerMetrics.setOrUpdateTableGauge(tableNameWithType, 
BrokerGauge.QUERY_QUOTA_CAPACITY_UTILIZATION_RATE, () -> {
+        double perBrokerRate = 
finalQueryQuotaEntity.getRateLimiter().getRate();
+        int actualHitCountWithinTimeRange = 
finalQueryQuotaEntity.getMaxQpsTracker().getHitCount();
+        long hitCountAllowedWithinTimeRage =
+            (long) (perBrokerRate * 
finalQueryQuotaEntity.getMaxQpsTracker().getDefaultTimeRangeMs() / 1000L);
+        // Since the MaxQpsTracker specifies 1-min window as valid time range, 
we can get the query quota capacity
+        // utilization by using the actual hit count within 1 min divided by 
the expected hit count within 1 min.
+        long percentageOfCapacityUtilization = actualHitCountWithinTimeRange * 
100L / hitCountAllowedWithinTimeRage;
+        LOGGER.debug("The percentage of rate limit capacity utilization is 
{}", percentageOfCapacityUtilization);
+        return percentageOfCapacityUtilization;
+      });
+    }
+  }
+
   /**
    * {@inheritDoc}
    * <p>Acquires a token from rate limiter based on the table name.
@@ -316,13 +339,6 @@ public class HelixExternalViewBasedQueryQuotaManager 
implements ClusterChangeHan
 
     // Emit the qps capacity utilization rate.
     int numHits = queryQuotaEntity.getQpsTracker().getHitCount();
-    if (_brokerMetrics != null) {
-      int percentageOfCapacityUtilization = (int) (numHits * 100 / 
perBrokerRate);
-      LOGGER.debug("The percentage of rate limit capacity utilization is {}", 
percentageOfCapacityUtilization);
-      _brokerMetrics.setValueOfTableGauge(tableNameWithType, 
BrokerGauge.QUERY_QUOTA_CAPACITY_UTILIZATION_RATE,
-          percentageOfCapacityUtilization);
-    }
-
     if (!rateLimiter.tryAcquire()) {
       LOGGER.info("Quota is exceeded for table: {}. Per-broker rate: {}. 
Current qps: {}", tableNameWithType,
           perBrokerRate, numHits);
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HitCounter.java 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HitCounter.java
index eedc53903d..b656c02344 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HitCounter.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/HitCounter.java
@@ -83,10 +83,20 @@ public class HitCounter {
 
   @VisibleForTesting
   int getHitCount(long timestamp) {
+    return getHitCount(timestamp, _bucketCount);
+  }
+
+  /**
+   * Get the hit count within the valid number of buckets.
+   * @param timestamp the current timestamp
+   * @param validBucketCount the valid number of buckets
+   * @return the number of hits within the valid bucket count
+   */
+  int getHitCount(long timestamp, int validBucketCount) {
     long numTimeUnits = timestamp / _timeBucketWidthMs;
     int count = 0;
     for (int i = 0; i < _bucketCount; i++) {
-      if (numTimeUnits - _bucketStartTime.get(i) < _bucketCount) {
+      if (numTimeUnits - _bucketStartTime.get(i) < validBucketCount) {
         count += _bucketHitCount.get(i);
       }
     }
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/MaxHitRateTracker.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/MaxHitRateTracker.java
index bdb8dbc214..b0cbd88b0b 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/MaxHitRateTracker.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/queryquota/MaxHitRateTracker.java
@@ -34,6 +34,7 @@ public class MaxHitRateTracker extends HitCounter {
   private static final int ONE_SECOND_BUCKET_WIDTH_MS = 1000;
   private static final int MAX_TIME_RANGE_FACTOR = 2;
 
+  private final int _validBucketCount;
   private final long _maxTimeRangeMs;
   private final long _defaultTimeRangeMs;
   private volatile long _lastAccessTimestamp;
@@ -44,6 +45,7 @@ public class MaxHitRateTracker extends HitCounter {
 
   private MaxHitRateTracker(int defaultTimeRangeInSeconds, int 
maxTimeRangeInSeconds) {
     super(maxTimeRangeInSeconds, (int) (maxTimeRangeInSeconds * 1000L / 
ONE_SECOND_BUCKET_WIDTH_MS));
+    _validBucketCount = (int) (defaultTimeRangeInSeconds * 1000L / 
ONE_SECOND_BUCKET_WIDTH_MS);
     _defaultTimeRangeMs = defaultTimeRangeInSeconds * 1000L;
     _maxTimeRangeMs = maxTimeRangeInSeconds * 1000L;
   }
@@ -80,4 +82,14 @@ public class MaxHitRateTracker extends HitCounter {
     _lastAccessTimestamp = now;
     return maxCount;
   }
+
+  @VisibleForTesting
+  @Override
+  int getHitCount(long now) {
+    return super.getHitCount(now, _validBucketCount);
+  }
+
+  public long getDefaultTimeRangeMs() {
+    return _defaultTimeRangeMs;
+  }
 }
diff --git 
a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/MaxHitRateTrackerTest.java
 
b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/MaxHitRateTrackerTest.java
index b4d9cc5f93..f19c55504d 100644
--- 
a/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/MaxHitRateTrackerTest.java
+++ 
b/pinot-broker/src/test/java/org/apache/pinot/broker/queryquota/MaxHitRateTrackerTest.java
@@ -37,24 +37,29 @@ public class MaxHitRateTrackerTest {
     long latestTimeStamp = currentTimestamp + (timeInSec - 1) * 1000;
     Assert.assertNotNull(hitCounter);
     Assert.assertEquals(5, hitCounter.getMaxCountPerBucket(latestTimeStamp));
+    Assert.assertEquals(5 * 60, hitCounter.getHitCount(latestTimeStamp));
 
     // 2 seconds have passed, the hit counter should return 5 as well since 
the count in the last bucket could increase.
     latestTimeStamp = latestTimeStamp + 2000L;
     Assert.assertEquals(5, hitCounter.getMaxCountPerBucket(latestTimeStamp));
+    Assert.assertEquals(5 * (60 - 2), hitCounter.getHitCount(latestTimeStamp));
 
     // This time it should return 0 as the internal lastAccessTimestamp has 
already been updated and there is no more
     // hits between the gap.
     latestTimeStamp = latestTimeStamp + 2000L;
     Assert.assertEquals(0, hitCounter.getMaxCountPerBucket(latestTimeStamp));
+    Assert.assertEquals(5 * (60 - 4), hitCounter.getHitCount(latestTimeStamp));
 
     // Increment the hit in this second and we should see the result becomes 1.
     hitCounter.hit(latestTimeStamp);
     latestTimeStamp = latestTimeStamp + 2000L;
     Assert.assertEquals(1, hitCounter.getMaxCountPerBucket(latestTimeStamp));
+    Assert.assertEquals(5 * (60 - 6) + 1, 
hitCounter.getHitCount(latestTimeStamp));
 
     // More than a time range period has passed and the hit counter should 
return 0 as there is no hits.
     hitCounter.hit(latestTimeStamp);
     latestTimeStamp = latestTimeStamp + timeInSec * 2 * 1000L + 2000L;
     Assert.assertEquals(0, hitCounter.getMaxCountPerBucket(latestTimeStamp));
+    Assert.assertEquals(0, hitCounter.getHitCount(latestTimeStamp));
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to