junrao commented on code in PR #15889: URL: https://github.com/apache/kafka/pull/15889#discussion_r1602125187
########## clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java: ########## @@ -63,33 +63,38 @@ public void record(MetricConfig config, double value, long timeMs) { @Override public double measure(MetricConfig config, long now) { + long windowSizeMs = windowSize(config, now); double value = stat.measure(config, now); - return value / convert(windowSize(config, now), unit); + return value / convert(windowSizeMs, unit); } public long windowSize(MetricConfig config, long now) { // purge old samples before we compute the window size - stat.purgeObsoleteSamples(config, now); + long purgedUpToMs = stat.purgeObsoleteSamples(config, now); /* * Here we check the total amount of time elapsed since the oldest non-obsolete window. * This give the total windowSize of the batch which is the time used for Rate computation. - * However, there is an issue if we do not have sufficient data for e.g. if only 1 second has elapsed in a 30 second + * However, there is an issue if we do not have sufficient data for e.g. if only 1 second has elapsed in a 30-second * window, the measured rate will be very high. - * Hence we assume that the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete. + * Hence, we assume that the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete. * * Note that we could simply count the amount of time elapsed in the current window and add n-1 windows to get the total time, * but this approach does not account for sleeps. SampledStat only creates samples whenever record is called, * if no record is called for a period of time that time is not accounted for in windowSize and produces incorrect results. */ - long totalElapsedTimeMs = now - stat.oldest(now).lastWindowMs; + long totalElapsedTimeMs = now - stat.oldest(now).startTimeMs; // Check how many full windows of data we have currently retained int numFullWindows = (int) (totalElapsedTimeMs / config.timeWindowMs()); int minFullWindows = config.samples() - 1; // If the available windows are less than the minimum required, add the difference to the totalElapsedTime - if (numFullWindows < minFullWindows) + if (numFullWindows < minFullWindows) { totalElapsedTimeMs += (minFullWindows - numFullWindows) * config.timeWindowMs(); + } + + // if some part of considered interval was just purged with its data, exclude it + totalElapsedTimeMs = Math.min(totalElapsedTimeMs, now - purgedUpToMs); Review Comment: This approach works for the newly added test case. However, I have two concerns. 1. It's not very repeatable. For example, if `measure()` is called a second time after a very short window, no samples will be purged and we won't be compensating for the purgedUpToMs. Then, we will fall back to use the larger window, which will result in a much lower measured value. 2. The logic is bit hard to follow now. Here is an alternative. Note that in the new test case, the oldest window actually partially overlaps with the total sample window since the last recording falls in it. So, instead of purging it, we could keep the overlapping sample and include it in both the measure and windowSize calculation. This could make the sample window a bit larger than what's configured, but it probably provides a more accurate/consistent measurement. We need to extend `Sample` to remember the last recording time. This probably addresses both of the above concerns. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org