divijvaidya commented on code in PR #12045:
URL: https://github.com/apache/kafka/pull/12045#discussion_r897838652


##########
clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java:
##########
@@ -34,35 +35,56 @@
  */
 public abstract class SampledStat implements MeasurableStat {
 
-    private double initialValue;
+    private final double initialValue;
+    /**
+     * Index of the latest stored sample.
+     */
     private int current = 0;
+    /**
+     * Stores the recorded samples in a ring buffer.
+     */
     protected List<Sample> samples;
 
     public SampledStat(double initialValue) {
         this.initialValue = initialValue;
         this.samples = new ArrayList<>(2);
     }
 
+    /**
+     * {@inheritDoc}
+     *
+     * On every record, do the following:
+     * 1. Check if the current window has expired
+     * 2. If yes, then advance the current pointer to new window. The start 
time of the new window is set to nearest
+     *    possible starting point for the new window. The nearest starting 
point occurs at config.timeWindowMs intervals
+     *    from the end time of last known window.
+     * 3. Update the recorded value for the current window
+     * 4. Increase the number of event count
+     */
     @Override
-    public void record(MetricConfig config, double value, long timeMs) {
-        Sample sample = current(timeMs);
-        if (sample.isComplete(timeMs, config))
-            sample = advance(config, timeMs);
-        update(sample, config, value, timeMs);
-        sample.eventCount += 1;
+    public void record(MetricConfig config, double value, long 
recordingTimeMs) {
+        Sample sample = current(recordingTimeMs);
+        if (sample.isComplete(recordingTimeMs, config)) {
+            final long previousWindowStartTime = sample.lastWindowMs;
+            final long previousWindowEndtime = previousWindowStartTime + 
config.timeWindowMs();
+            final long startTimeOfNewWindow = recordingTimeMs - 
((recordingTimeMs - previousWindowEndtime) % config.timeWindowMs());

Review Comment:
   That is a great observation Tom! Ideally the code should be written to 
ensure that recording a metric should not block because the operation is wall 
clock time sensitive. But as you observed, we have `synchronized` at multiple 
places which may lead to sample being recorded in a window which has already 
completed in the past.
   
   For cases when the `sensor` is used for calculating the ConnectionQuota, 
this problem wouldn't occur because the calculation of `Time.milliseconds` is 
done inside a `synchronised` block which ensures that ensures that only one 
thread with latest timestamp will be accessing the sensor.record at a time.
   
   But I don't know about other code paths other than ConnectionQuota that use 
sensor and your observation is valid. 
   
   Since this problem is independent of this code change, and breaks existing 
logic if/when recordingTimeMs < endTimeOfPreviousWindow, I have created a JIRA 
to address this in a separate PR: 
https://issues.apache.org/jira/browse/KAFKA-13994
   
   [1] 
https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/SocketServer.scala#L1541-L1542
 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to