divijvaidya commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r897838652
########## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ########## @@ -34,35 +35,56 @@ */ public abstract class SampledStat implements MeasurableStat { - private double initialValue; + private final double initialValue; + /** + * Index of the latest stored sample. + */ private int current = 0; + /** + * Stores the recorded samples in a ring buffer. + */ protected List<Sample> samples; public SampledStat(double initialValue) { this.initialValue = initialValue; this.samples = new ArrayList<>(2); } + /** + * {@inheritDoc} + * + * On every record, do the following: + * 1. Check if the current window has expired + * 2. If yes, then advance the current pointer to new window. The start time of the new window is set to nearest + * possible starting point for the new window. The nearest starting point occurs at config.timeWindowMs intervals + * from the end time of last known window. + * 3. Update the recorded value for the current window + * 4. Increase the number of event count + */ @Override - public void record(MetricConfig config, double value, long timeMs) { - Sample sample = current(timeMs); - if (sample.isComplete(timeMs, config)) - sample = advance(config, timeMs); - update(sample, config, value, timeMs); - sample.eventCount += 1; + public void record(MetricConfig config, double value, long recordingTimeMs) { + Sample sample = current(recordingTimeMs); + if (sample.isComplete(recordingTimeMs, config)) { + final long previousWindowStartTime = sample.lastWindowMs; + final long previousWindowEndtime = previousWindowStartTime + config.timeWindowMs(); + final long startTimeOfNewWindow = recordingTimeMs - ((recordingTimeMs - previousWindowEndtime) % config.timeWindowMs()); Review Comment: That is a great observation Tom! Ideally the code should be written to ensure that recording a metric should not block because the operation is wall clock time sensitive. But as you observed, we have `synchronized` at multiple places which may lead to sample being recorded in a window which has already completed in the past. For cases when the `sensor` is used for calculating the ConnectionQuota, this problem wouldn't occur because the calculation of `Time.milliseconds` is done inside a `synchronised` block which ensures that ensures that only one thread with latest timestamp will be accessing the sensor.record at a time. But I don't know about other code paths other than ConnectionQuota that use sensor and your observation is valid. Since this problem is independent of this code change, and breaks existing logic if/when recordingTimeMs < endTimeOfPreviousWindow, I have created a JIRA to address this in a separate PR: https://issues.apache.org/jira/browse/KAFKA-13994 [1] https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/SocketServer.scala#L1541-L1542 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org