[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r897838652 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -34,35 +35,56 @@ */ public abstract class SampledStat implements MeasurableStat { -private double initialValue; +private final double initialValue; +/** + * Index of the latest stored sample. + */ private int current = 0; +/** + * Stores the recorded samples in a ring buffer. + */ protected List samples; public SampledStat(double initialValue) { this.initialValue = initialValue; this.samples = new ArrayList<>(2); } +/** + * {@inheritDoc} + * + * On every record, do the following: + * 1. Check if the current window has expired + * 2. If yes, then advance the current pointer to new window. The start time of the new window is set to nearest + *possible starting point for the new window. The nearest starting point occurs at config.timeWindowMs intervals + *from the end time of last known window. + * 3. Update the recorded value for the current window + * 4. Increase the number of event count + */ @Override -public void record(MetricConfig config, double value, long timeMs) { -Sample sample = current(timeMs); -if (sample.isComplete(timeMs, config)) -sample = advance(config, timeMs); -update(sample, config, value, timeMs); -sample.eventCount += 1; +public void record(MetricConfig config, double value, long recordingTimeMs) { +Sample sample = current(recordingTimeMs); +if (sample.isComplete(recordingTimeMs, config)) { +final long previousWindowStartTime = sample.lastWindowMs; +final long previousWindowEndtime = previousWindowStartTime + config.timeWindowMs(); +final long startTimeOfNewWindow = recordingTimeMs - ((recordingTimeMs - previousWindowEndtime) % config.timeWindowMs()); Review Comment: That is a great observation Tom! Ideally the code should be written to ensure that recording a metric should not block because the operation is wall clock time sensitive. But as you observed, we have `synchronized` at multiple places which may lead to sample being recorded in a window which has already completed in the past. For cases when the `sensor` is used for calculating the ConnectionQuota, this problem wouldn't occur because the calculation of `Time.milliseconds` is done inside a `synchronised` block which ensures that ensures that only one thread with latest timestamp will be accessing the sensor.record at a time. But I don't know about other code paths other than ConnectionQuota that use sensor and your observation is valid. Since this problem is independent of this code change, and breaks existing logic if/when recordingTimeMs < endTimeOfPreviousWindow, I have created a JIRA to address this in a separate PR: https://issues.apache.org/jira/browse/KAFKA-13994 [1] https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/SocketServer.scala#L1541-L1542 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r897722547 ## clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java: ## @@ -608,14 +609,14 @@ public void testRateWindowing() throws Exception { time.sleep(cfg.timeWindowMs() / 2); // prior to any time passing -double elapsedSecs = (cfg.timeWindowMs() * (cfg.samples() - 1) + cfg.timeWindowMs() / 2) / 1000.0; +double elapsedSecs = (cfg.timeWindowMs() * (cfg.samples() - 1) + (((double) cfg.timeWindowMs()) / 2.0d)) / 1000.0d; Review Comment: Thanks for catching this. I have fixed this in the latest revision. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r897733191 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -34,35 +35,56 @@ */ public abstract class SampledStat implements MeasurableStat { -private double initialValue; +private final double initialValue; +/** + * Index of the latest stored sample. + */ private int current = 0; +/** + * Stores the recorded samples in a ring buffer. Review Comment: That sounds fair. I have fixed the java doc in the latest revision as per your suggestion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r897733191 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -34,35 +35,56 @@ */ public abstract class SampledStat implements MeasurableStat { -private double initialValue; +private final double initialValue; +/** + * Index of the latest stored sample. + */ private int current = 0; +/** + * Stores the recorded samples in a ring buffer. Review Comment: That sounds fair. I have fixed the java doc in the latest revision and replaced with the following ``` /** * Stores the recorded samples. * Note that the previously recorded samples may be overwritten/reset if they are considered obsolete by the * {@link Sample#purgeObsoleteSamples} function. */ ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r897724648 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -84,13 +106,7 @@ public Sample current(long timeMs) { public Sample oldest(long now) { if (samples.size() == 0) this.samples.add(newSample(now)); -Sample oldest = this.samples.get(0); -for (int i = 1; i < this.samples.size(); i++) { -Sample curr = this.samples.get(i); -if (curr.lastWindowMs < oldest.lastWindowMs) -oldest = curr; -} -return oldest; +return samples.stream().min(Comparator.comparingLong(s -> s.lastWindowMs)).orElse(samples.get(0)); Review Comment: I find the new code more readable since we can immediately eye ball that a min is being calculated vs. in the previous version where we have to understand the assignments and logic in for loop to determine what is going on. Nevertheless, I don't have strong opinion on this one. If you still think we need to revert it back, I will do it. Let me know. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r897724922 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java: ## @@ -68,24 +68,55 @@ public double measure(MetricConfig config, long now) { } public long windowSize(MetricConfig config, long now) { -// purge old samples before we compute the window size +// Purge obsolete samples. Obsolete samples are the ones which are not relevant to the current calculation +// because their creation time is outside (before) the duration of time window used to calculate rate. stat.purgeObsoleteSamples(config, now); /* * Here we check the total amount of time elapsed since the oldest non-obsolete window. - * This give the total windowSize of the batch which is the time used for Rate computation. - * However, there is an issue if we do not have sufficient data for e.g. if only 1 second has elapsed in a 30 second - * window, the measured rate will be very high. - * Hence we assume that the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete. + * This gives the duration of computation time window which used to calculate Rate. Review Comment: Thanks for catching this. I have fixed this in the latest revision. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r897721779 ## clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java: ## @@ -149,13 +149,14 @@ private void verifyStats(Function metricValueFunc) { assertEquals(5.0, metricValueFunc.apply(metrics.metric(metrics.metricName("s2.total", "grp1"))), EPS, "s2 reflects the constant value"); -assertEquals(4.5, metricValueFunc.apply(metrics.metric(metrics.metricName("test.avg", "grp1"))), EPS, +assertEquals(sum / (double) count, metricValueFunc.apply(metrics.metric(metrics.metricName("test.avg", "grp1"))), EPS, "Avg(0...9) = 4.5"); assertEquals(count - 1, metricValueFunc.apply(metrics.metric(metrics.metricName("test.max", "grp1"))), EPS, "Max(0...9) = 9"); assertEquals(0.0, metricValueFunc.apply(metrics.metric(metrics.metricName("test.min", "grp1"))), EPS, "Min(0...9) = 0"); -assertEquals(sum / elapsedSecs, metricValueFunc.apply(metrics.metric(metrics.metricName("test.rate", "grp1"))), EPS, +// rate is calculated over the first ever window. Hence, we assume presence of prior windows with 0 recorded events. +assertEquals((double) sum / elapsedSecs, metricValueFunc.apply(metrics.metric(metrics.metricName("test.rate", "grp1"))), EPS, Review Comment: Thanks for catching this. I have fixed this in the latest revision. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r862915029 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -34,22 +34,38 @@ */ public abstract class SampledStat implements MeasurableStat { -private double initialValue; +private final double initialValue; private int current = 0; + protected List samples; public SampledStat(double initialValue) { this.initialValue = initialValue; this.samples = new ArrayList<>(2); } +/** + * {@inheritDoc} + * + * On every record, do the following: + * 1. Check if the current window has expired + * 2. If yes, then advance the current pointer to new window. The start time of the new window is set to nearest + *possible starting point for the new window. The nearest starting point occurs at config.timeWindowMs intervals + *from the end time of last known window. + * 3. Update the recorded value for the current window + * 4. Increase the number of event count + */ @Override -public void record(MetricConfig config, double value, long timeMs) { -Sample sample = current(timeMs); -if (sample.isComplete(timeMs, config)) -sample = advance(config, timeMs); -update(sample, config, value, timeMs); -sample.eventCount += 1; +public void record(MetricConfig config, double value, long recordingTimeMs) { +Sample sample = current(recordingTimeMs); +if (sample.isComplete(recordingTimeMs, config)) { +final long previousWindowStartTime = sample.getLastWindowMs(); +sample = advance(config, recordingTimeMs); Review Comment: Modified the code to make it more readable. It is not exactly what you mentioned but should be more readable now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r862909115 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -138,6 +170,46 @@ public boolean isComplete(long timeMs, MetricConfig config) { return timeMs - lastWindowMs >= config.timeWindowMs() || eventCount >= config.eventWindow(); } +public boolean isActive() { Review Comment: All changes to public API have been reverted. Addressing the core fix does not require any changes to the public APIs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r862908720 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java: ## @@ -52,10 +51,6 @@ public Rate(TimeUnit unit, SampledStat stat) { this.unit = unit; } -public String unitName() { Review Comment: All changes to public API have been reverted. Addressing the core fix does not require any changes to the public APIs. ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -110,25 +127,40 @@ public String toString() { protected void purgeObsoleteSamples(MetricConfig config, long now) { long expireAge = config.samples() * config.timeWindowMs(); for (Sample sample : samples) { -if (now - sample.lastWindowMs >= expireAge) +if (now - sample.getLastWindowMs() >= expireAge) sample.reset(now); } } protected static class Sample { -public double initialValue; -public long eventCount; -public long lastWindowMs; -public double value; +private double initialValue; Review Comment: All changes to public API have been reverted. Addressing the core fix does not require any changes to the public APIs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r862908241 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -110,25 +127,40 @@ public String toString() { protected void purgeObsoleteSamples(MetricConfig config, long now) { long expireAge = config.samples() * config.timeWindowMs(); for (Sample sample : samples) { -if (now - sample.lastWindowMs >= expireAge) +if (now - sample.getLastWindowMs() >= expireAge) sample.reset(now); } } protected static class Sample { -public double initialValue; -public long eventCount; -public long lastWindowMs; -public double value; +private double initialValue; +private long eventCount; +private long lastWindowMs; +private double value; + +/** + * A Sample object could be re-used in a ring buffer to store future samples for space efficiency. + * Thus, a sample could be in either of the following lifecycle states: + * NOT_INITIALIZED: Sample has not been initialized. + * ACTIVE: Sample has values and is currently + * RESET: Sample has been reset and the object is not destroyed so that it could be used for storing future + *samples. + */ +private enum LifecycleState { Review Comment: This code has been removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r862908032 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/SampledStat.java: ## @@ -87,8 +103,9 @@ public Sample oldest(long now) { Sample oldest = this.samples.get(0); for (int i = 1; i < this.samples.size(); i++) { Sample curr = this.samples.get(i); -if (curr.lastWindowMs < oldest.lastWindowMs) +if ((curr.getLastWindowMs() < oldest.getLastWindowMs()) && curr.isActive()) { // only consider active samples Review Comment: I have completely got rid of these changes. They were more from useful for defensive programming but changing the public APIs would have required a KIP and also complicated this code review. I will file a separate PR to add these defensive checks back. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] divijvaidya commented on a diff in pull request #12045: KAFKA-12319: Change calculation of window size used to calculate `Rate`
divijvaidya commented on code in PR #12045: URL: https://github.com/apache/kafka/pull/12045#discussion_r862906685 ## clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java: ## @@ -68,28 +63,61 @@ public double measure(MetricConfig config, long now) { } public long windowSize(MetricConfig config, long now) { -// purge old samples before we compute the window size +// Purge obsolete samples. Obsolete samples are the ones which are not relevant to the current calculation +// because their creation time is outside (before) the duration of time window used to calculate rate. stat.purgeObsoleteSamples(config, now); /* * Here we check the total amount of time elapsed since the oldest non-obsolete window. - * This give the total windowSize of the batch which is the time used for Rate computation. - * However, there is an issue if we do not have sufficient data for e.g. if only 1 second has elapsed in a 30 second - * window, the measured rate will be very high. - * Hence we assume that the elapsed time is always N-1 complete windows plus whatever fraction of the final window is complete. + * This gives the duration of computation time window which used to calculate Rate. + * + * For scenarios when rate computation is performed after at least `config.samples` have completed, + * the duration of computation time window is determined by: + * window duration = (now - start time of oldest non-obsolete window) + * + * ## Special case: First ever window + * A special scenario occurs when rate calculation is performed before at least `config.samples` have completed + * (e.g. if only 1 second has elapsed in a 30 second). In such a scenario, window duration would be equal to the + * time elapsed in the current window (since oldest non-obsolete window is current window). This leads to the + * following values for rate. Consider the following example: + * config.timeWindowMs() = 1s + * config.samples() = 2 + * Record events (E) at timestamps: + * E1 = CurrentTimeStamp (T1) + * E2 = T1 + 30ms + * E2 = T1 + 60ms Review Comment: Fixed. Thanks for point thing this out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org