chia7712 commented on code in PR #15889:
URL: https://github.com/apache/kafka/pull/15889#discussion_r1611391288


##########
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##########
@@ -106,44 +109,51 @@ public String toString() {
 
     public abstract double combine(List<Sample> samples, MetricConfig config, 
long now);
 
-    /* Timeout any windows that have expired in the absence of any events */
+    // purge any samples that lack observed events within the monitored window
     protected void purgeObsoleteSamples(MetricConfig config, long now) {
         long expireAge = config.samples() * config.timeWindowMs();
         for (Sample sample : samples) {
-            if (now - sample.lastWindowMs >= expireAge)
+            // samples overlapping the monitored window are kept,
+            // even if they started before it
+            if (now - sample.lastEventMs >= expireAge) {
                 sample.reset(now);
+            }
         }
     }
 
     protected static class Sample {
         public double initialValue;
         public long eventCount;
-        public long lastWindowMs;
+        public long startTimeMs;
+        public long lastEventMs;
         public double value;
 
         public Sample(double initialValue, long now) {
             this.initialValue = initialValue;
             this.eventCount = 0;
-            this.lastWindowMs = now;
+            this.startTimeMs = now;
+            this.lastEventMs = now;
             this.value = initialValue;
         }
 
         public void reset(long now) {
             this.eventCount = 0;
-            this.lastWindowMs = now;
+            this.startTimeMs = now;
+            this.lastEventMs = now;
             this.value = initialValue;
         }
 
         public boolean isComplete(long timeMs, MetricConfig config) {
-            return timeMs - lastWindowMs >= config.timeWindowMs() || 
eventCount >= config.eventWindow();
+            return timeMs - startTimeMs >= config.timeWindowMs() || eventCount 
>= config.eventWindow();

Review Comment:
   Please ignore my comments. see following test
   ```java
       private double test(long eventWindow) {
           Rate rate = new Rate();
           Time timer = new MockTime();
           final MetricConfig config = new 
MetricConfig().samples(2).timeWindow(1, SECONDS).eventWindow(eventWindow);
           // generate 9 records
           IntStream.range(0, 9).forEach(__ -> {
               rate.record(config, 1, timer.milliseconds());
               timer.sleep(1);
           });
           // almost 1 second
           timer.sleep(990);
           return rate.measure(config, timer.milliseconds());
       }
   ```
   
   **before**: 3 / second
   **after**: 4.5 / second
   
   I neglect that this approach has a extra sample, and so the measure value 
can be larger than before.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to