[ 
https://issues.apache.org/jira/browse/KAFKA-3456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15211302#comment-15211302
 ] 

Jay Kreps commented on KAFKA-3456:
----------------------------------

I think the tradeoff is between getting a current estimate and using the same 
window length for all estimates--it's hard to do both efficiently without an 
exponential weighting scheme which has bigger drawbacks still. We need to get a 
current estimate because quotas need to shut off abusers right away and also 
because inherently the goal of monitoring and alerting is to detect bad things 
quickly. We did originally consider the approach you are proposing and rejected 
it after thinking more about what seemed important for the use case.

I think you are arguing that if you have alternating behavior where you have 30 
seconds of 1 req/sec and 30 seconds of 999 req/sec that the "right" answer is a 
flat estimate of 500 req/sec. I would argue that 500 req/sec is actually not a 
better answer, then having periods of 1, periods of 999, and periods of 
everything in between.

I think philosophically this is not the right way to think about things. I 
don't think in practice you can truly control the rate that the stat is checked 
and more importantly there isn't a "true" underlying rate that can be known. 
Rather the rate will always depend on the window and the goal is to give a good 
estimate of the "current" rate at the point in time you are asked. There is no 
"right" answer to this question--you can use more historical data to produce 
this estimate in which case it will update slower when the underlying behavior 
changes, or you can use less historical data in which case you will see more 
natural variance due to luck. Not using the most recent data isn't good though, 
since the most recent data is going to be most predictive of the current value.



> In-house KafkaMetric misreports metrics when periodically observed
> ------------------------------------------------------------------
>
>                 Key: KAFKA-3456
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3456
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer, core, producer 
>    Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0
>            Reporter: The Data Lorax
>            Assignee: Neha Narkhede
>            Priority: Minor
>
> The metrics captured by Kafka through the in-house {{SampledStat}} suffer 
> from misreporting metrics if observed in a periodic manner.
> Consider a {{Rate}} metric that is using the default 2 samples and 30 second 
> sample window i.e. the {{Rate}} is capturing 60 seconds worth of data.  So, 
> to report this metric to some external system we might poll it every 60 
> seconds to observe the current value. Using a shorter period would, in the 
> case of a {{Rate}}, lead to smoothing of the plotted data, and worse, in the 
> case of a {{Count}}, would lead to double counting - so 60 seconds is the 
> only period at which we can poll the metrics if we are to report accurate 
> metrics.
> To demonstrate the issue consider the following somewhat extreme case:
> The {{Rate}}  is capturing data from a system which alternates between a 999 
> per sec rate and a 1 per sec rate every 30 seconds, with the different rates 
> aligned with the sample boundaries within the {{Rate}} instance i.e. after 60 
> seconds the first sample within the {{Rate}} instance will have a rate of 999 
> per sec, and the second 1 per sec. 
> If we were to ask the metric for its value at this 60 second boundary it 
> would correctly report 500 per sec. However, if we asked it again 1 
> millisecond later it would report 1 per sec, as the first sample window has 
> been aged out. Depending on how retarded into the 60 sec period of the metric 
> our periodic poll of the metric was, we would observe a constant rate 
> somewhere in the range of 1 to 500 per second, most likely around the 250 
> mark. 
> Other metrics based off of the {{SampledStat}} type suffer from the same 
> issue e.g. the {{Count}} metric, given a constant rate of 1 per second, will 
> report a constant count somewhere between 30 and 60, rather than the correct 
> 60.
> This can be seen in the following test code:
> {code:java}
> public class MetricsTest {
>     private MetricConfig metricsConfig;
>     @Before
>     public void setUp() throws Exception {
>         metricsConfig = new MetricConfig();
>     }
>     private long t(final int bucket) {
>         return metricsConfig.timeWindowMs() * bucket;
>     }
>     @Test
>     public void testHowRateDropsMetrics() throws Exception {
>         Rate rate = new Rate();
>         metricsConfig.samples(2);
>         metricsConfig.timeWindow(30, TimeUnit.SECONDS);
>         // First sample window from t0 -> (t1 -1), with rate 999 per second:
>         for (long time = t(0); time != t(1); time += 1000) {
>             rate.record(metricsConfig, 999, time);
>         }
>         // Second sample window from t1 -> (t2 -1), with rate 1 per second:
>         for (long time = t(1); time != t(2); time += 1000) {
>             rate.record(metricsConfig, 1, time);
>         }
>         // Measure at bucket boundary, (though same issue exists all periodic 
> measurements)
>         final double m1 = rate.measure(metricsConfig, t(2));    // m1 = 1.0
>         // Third sample window from t2 -> (t3 -1), with rate 999 per second:
>         for (long time = t(2); time != t(3); time += 1000) {
>             rate.record(metricsConfig, 999, time);
>         }
>         // Second sample window from t3 -> (t4 -1), with rate 1 per second:
>         for (long time = t(3); time != t(4); time += 1000) {
>             rate.record(metricsConfig, 1, time);
>         }
>         // Measure second pair of samples:
>         final double m2 = rate.measure(metricsConfig, t(4));    // m2 = 1.0
>         assertEquals("Measurement of the rate over the first two samples", 
> 500.0, m1, 2.0);
>         assertEquals("Measurement of the rate over the last two samples", 
> 500.0, m2, 2.0);
>     }
>     @Test
>     public void testHowRateDropsMetricsWithRetardedObservations() throws 
> Exception {
>         final long retardation = 1000;
>         Rate rate = new Rate();
>         metricsConfig.samples(2);
>         metricsConfig.timeWindow(30, TimeUnit.SECONDS);
>         // First sample window from t0 -> (t1 -1), with rate 999 per second:
>         for (long time = t(0); time != t(1); time += 1000) {
>             rate.record(metricsConfig, 999, time);
>         }
>         // Second sample window from t1 -> (t2 -1), with rate 1 per second:
>         for (long time = t(1); time != t(2); time += 1000) {
>             rate.record(metricsConfig, 1, time);
>         }
>         double m1 = 0.0;
>         // Third sample window from t2 -> (t3 -1), with rate 999 per second:
>         for (long time = t(2); time != t(3); time += 1000) {
>             rate.record(metricsConfig, 999, time);
>             if (time == t(2) + retardation) {
>                 m1 = rate.measure(metricsConfig, time); // // m1 = 
> 65.something
>             }
>         }
>         // Second sample window from t3 -> (t4 -1), with rate 1 per second:
>         for (long time = t(3); time != t(4); time += 1000) {
>             rate.record(metricsConfig, 1, time);
>         }
>         double m2 = 0.0;
>         // Fifth sample window from t4 -> (t5 -1), with rate 999 per second:
>         for (long time = t(4); time != t(5); time += 1000) {
>             rate.record(metricsConfig, 999, time);
>             if (time == t(4) + retardation) {
>                 m2 = rate.measure(metricsConfig, time); // m2 = 65.something
>             }
>         }
>         assertTrue("Measurement of the rate over the first two samples should 
> be well over 250 per sec", m1 > 250.0);
>         assertTrue("Measurement of the rate over the last two samples should 
> be well over 250 per sec", m2 > 250.0);
>     }
>     @Test
>     public void testHowCountDropsMetrics() throws Exception {
>         Count count = new Count();
>         metricsConfig.samples(2);
>         metricsConfig.timeWindow(30, TimeUnit.SECONDS);
>         // Record count for constant rate of 1 per second over entire 60 
> second period:
>         for (long time = t(0); time != t(2); time += 1000) {
>             count.record(metricsConfig, 1, time);
>         }
>         final double m1 = count.measure(metricsConfig, t(2) -1);    // m1 = 60
>         count.record(metricsConfig, 1, t(2));
>         final double m2 = count.measure(metricsConfig, t(2));       // m2 = 31
>         assertEquals("Measurement of the count at end of the 60 second 
> period", 60, m1, 0.1);
>         assertEquals("Measurement of the count at 1ms after the 60 second 
> period", 60, m2, 0.1);
>     }
> }
> {code}
> I'm happy to work on the solution to this, but first want to check I've not 
> missed something and I'm not doing something stupid. I think my reasoning is 
> sound.
> One solution would be to keep 'n + 1' samples, and only combine the complete 
> 'n' samples when asked for a value. While this would provide the correct 
> metrics, such metrics would respond in a delayed manner to any change, with 
> the maximum delay being the configured time of 1 sample, e.g, by default 30s 
> maximum and 15s average delay. 
> Which is better: correct delayed stats or incorrect responsive stats? Tough 
> question! Depends on the use-case for the stats - which I've noticed includes 
> internal quotas.  Personally, I'd err towards correct stats. But I'm 
> interested to hear others thoughts...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to