[ 
https://issues.apache.org/jira/browse/KAFKA-3456?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

The Data Lorax updated KAFKA-3456:
----------------------------------
    Description: 
The metrics captured by Kafka through the in-house {{SampledStat}} suffer from 
misreporting metrics if observed in a periodic manner.

Consider a {{Rate}} metric that is using the default 2 samples and 30 second 
sample window i.e. the {{Rate}} is capturing 60 seconds worth of data.  So, to 
report this metric to some external system we might poll it every 60 seconds to 
observe the current value. Using a shorter period would, in the case of a 
{{Rate}}, lead to smoothing of the plotted data, and worse, in the case of a 
{{Count}}, would lead to double counting - so 60 seconds is the only period at 
which we can poll the metrics if we are to report accurate metrics.

To demonstrate the issue consider the following somewhat extreme case:

The {{Rate}}  is capturing data from a system which alternates between a 999 
per sec rate and a 1 per sec rate every 30 seconds, with the different rates 
aligned with the sample boundaries within the {{Rate}} instance i.e. after 60 
seconds the first sample within the {{Rate}} instance will have a rate of 999 
per sec, and the second 1 per sec. 

If we were to ask the metric for its value at this 60 second boundary it would 
correctly report 500 per sec. However, if we asked it again 1 millisecond later 
it would report 1 per sec, as the first sample window has been aged out. 
Depending on how retarded into the 60 sec period of the metric our periodic 
poll of the metric was, we would observe a constant rate somewhere in the range 
of 1 to 500 per second, most likely around the 250 mark. 

Other metrics based off of the {{SampledStat}} type suffer from the same issue 
e.g. the {{Count}} metric, given a constant rate of 1 per second, will report a 
constant count somewhere between 30 and 60, rather than the correct 60.

This can be seen in the following test code:

{code:java}
public class MetricsTest {
    private MetricConfig metricsConfig;

    @Before
    public void setUp() throws Exception {
        metricsConfig = new MetricConfig();
        metricsConfig.samples(2);
        metricsConfig.timeWindow(10, TimeUnit.SECONDS);
    }

    private long t(final int bucket) {
        return metricsConfig.timeWindowMs() * bucket;
    }

    @Test
    public void testHowRateDropsMetrics() throws Exception {
        Rate rate = new Rate();
        metricsConfig.samples(2);
        metricsConfig.timeWindow(30, TimeUnit.SECONDS);

        // First sample window from t0 -> (t1 -1), with rate 999 per second:
        for (long time = t(0); time != t(1); time += 1000) {
            rate.record(metricsConfig, 999, time);
        }

        // Second sample window from t1 -> (t2 -1), with rate 1 per second:
        for (long time = t(1); time != t(2); time += 1000) {
            rate.record(metricsConfig, 1, time);
        }

        // Measure at bucket boundary, (though same issue exists all periodic 
measurements)
        final double m1 = rate.measure(metricsConfig, t(2));    // m1 = 1.0

        // Third sample window from t2 -> (t3 -1), with rate 999 per second:
        for (long time = t(2); time != t(3); time += 1000) {
            rate.record(metricsConfig, 999, time);
        }

        // Second sample window from t3 -> (t4 -1), with rate 1 per second:
        for (long time = t(3); time != t(4); time += 1000) {
            rate.record(metricsConfig, 1, time);
        }

        // Measure second pair of samples:
        final double m2 = rate.measure(metricsConfig, t(4));    // m2 = 1.0

        assertEquals("Measurement of the rate over the first two samples", 
500.0, m1, 2.0);
        assertEquals("Measurement of the rate over the last two samples", 
500.0, m2, 2.0);
    }

    @Test
    public void testHowRateDropsMetricsWithRetardedObservations() throws 
Exception {
        final long retardation = 1000;

        Rate rate = new Rate();
        metricsConfig.samples(2);
        metricsConfig.timeWindow(30, TimeUnit.SECONDS);

        // First sample window from t0 -> (t1 -1), with rate 999 per second:
        for (long time = t(0); time != t(1); time += 1000) {
            rate.record(metricsConfig, 999, time);
        }

        // Second sample window from t1 -> (t2 -1), with rate 1 per second:
        for (long time = t(1); time != t(2); time += 1000) {
            rate.record(metricsConfig, 1, time);
        }

        double m1 = 0.0;

        // Third sample window from t2 -> (t3 -1), with rate 999 per second:
        for (long time = t(2); time != t(3); time += 1000) {
            rate.record(metricsConfig, 999, time);

            if (time == t(2) + retardation) {
                m1 = rate.measure(metricsConfig, time); // // m1 = 65.something
            }
        }

        // Second sample window from t3 -> (t4 -1), with rate 1 per second:
        for (long time = t(3); time != t(4); time += 1000) {
            rate.record(metricsConfig, 1, time);
        }

        double m2 = 0.0;

        // Fifth sample window from t4 -> (t5 -1), with rate 999 per second:
        for (long time = t(4); time != t(5); time += 1000) {
            rate.record(metricsConfig, 999, time);

            if (time == t(4) + retardation) {
                m2 = rate.measure(metricsConfig, time); // m2 = 65.something
            }
        }

        assertTrue("Measurement of the rate over the first two samples should 
be well over 250 per sec", m1 > 250.0);
        assertTrue("Measurement of the rate over the last two samples should be 
well over 250 per sec", m2 > 250.0);
    }

    @Test
    public void testHowCountDropsMetrics() throws Exception {
        Count count = new Count();
        metricsConfig.samples(2);
        metricsConfig.timeWindow(30, TimeUnit.SECONDS);

        // Record count for constant rate of 1 per second over entire 60 second 
period:
        for (long time = t(0); time != t(2); time += 1000) {
            count.record(metricsConfig, 1, time);
        }

        final double m1 = count.measure(metricsConfig, t(2) -1);    // m1 = 60

        count.record(metricsConfig, 1, t(2));

        final double m2 = count.measure(metricsConfig, t(2));       // m2 = 31

        assertEquals("Measurement of the count at end of the 60 second period", 
60, m1, 0.1);
        assertEquals("Measurement of the count at 1ms after the 60 second 
period", 60, m2, 0.1);
    }
}
{code}

  was:
The metrics captured by Kafka through the in-house {{SampledStat}} suffer from 
misreporting metrics if observed in a periodic manner.

Consider a {{Rate}} metric that is using the default 2 samples and 30 second 
sample window i.e. the {{Rate}} is capturing 60 seconds worth of data.  So, to 
report this metric to some external system we might poll it every 60 seconds to 
observe the current value. Using a shorter period would, in the case of a 
{{Rate}}, lead to smoothing of the plotted data, and worse, in the case of a 
{{Count}}, would lead to double counting - so 60 seconds is the only period at 
which we can poll the metrics if we are to report accurate metrics.

To demonstrate the issue consider the following somewhat extreme case:

The {{Rate}}  is capturing data from a system which alternates between a 999 
per sec rate and a 1 per sec rate every 30 seconds, with the different rates 
aligned with the sample boundaries within the {{Rate}} instance i.e. after 60 
seconds the first sample within the {{Rate}} instance will have a rate of 999 
per sec, and the second 1 per sec. 

If we were to as the metric for its value at this 60 second boundary it would 
correctly report 500 per sec. However, if we asked it again 1 millisecond later 
it would report 1 per sec, as the first sample window has been aged out. 
Depending on how retarded into the 60 sec period of the metric our periodic 
poll of the metric was, we would observe a constant rate somewhere the range of 
1 to 500 per second, most likely around the 250 mark. 

Other metrics based off of the {{SampledStat}} type suffer from the same issue 
e.g. the {{Count}} metric, given a constant rate of 1 per second, will report a 
constant count somewhere between 30 and 60, rather than the correct 60.


> In-house KafkaMetric misreports metrics when periodically observed
> ------------------------------------------------------------------
>
>                 Key: KAFKA-3456
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3456
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer, core, producer 
>    Affects Versions: 0.9.0.0, 0.9.0.1, 0.10.0.0
>            Reporter: The Data Lorax
>            Assignee: Neha Narkhede
>            Priority: Minor
>
> The metrics captured by Kafka through the in-house {{SampledStat}} suffer 
> from misreporting metrics if observed in a periodic manner.
> Consider a {{Rate}} metric that is using the default 2 samples and 30 second 
> sample window i.e. the {{Rate}} is capturing 60 seconds worth of data.  So, 
> to report this metric to some external system we might poll it every 60 
> seconds to observe the current value. Using a shorter period would, in the 
> case of a {{Rate}}, lead to smoothing of the plotted data, and worse, in the 
> case of a {{Count}}, would lead to double counting - so 60 seconds is the 
> only period at which we can poll the metrics if we are to report accurate 
> metrics.
> To demonstrate the issue consider the following somewhat extreme case:
> The {{Rate}}  is capturing data from a system which alternates between a 999 
> per sec rate and a 1 per sec rate every 30 seconds, with the different rates 
> aligned with the sample boundaries within the {{Rate}} instance i.e. after 60 
> seconds the first sample within the {{Rate}} instance will have a rate of 999 
> per sec, and the second 1 per sec. 
> If we were to ask the metric for its value at this 60 second boundary it 
> would correctly report 500 per sec. However, if we asked it again 1 
> millisecond later it would report 1 per sec, as the first sample window has 
> been aged out. Depending on how retarded into the 60 sec period of the metric 
> our periodic poll of the metric was, we would observe a constant rate 
> somewhere in the range of 1 to 500 per second, most likely around the 250 
> mark. 
> Other metrics based off of the {{SampledStat}} type suffer from the same 
> issue e.g. the {{Count}} metric, given a constant rate of 1 per second, will 
> report a constant count somewhere between 30 and 60, rather than the correct 
> 60.
> This can be seen in the following test code:
> {code:java}
> public class MetricsTest {
>     private MetricConfig metricsConfig;
>     @Before
>     public void setUp() throws Exception {
>         metricsConfig = new MetricConfig();
>         metricsConfig.samples(2);
>         metricsConfig.timeWindow(10, TimeUnit.SECONDS);
>     }
>     private long t(final int bucket) {
>         return metricsConfig.timeWindowMs() * bucket;
>     }
>     @Test
>     public void testHowRateDropsMetrics() throws Exception {
>         Rate rate = new Rate();
>         metricsConfig.samples(2);
>         metricsConfig.timeWindow(30, TimeUnit.SECONDS);
>         // First sample window from t0 -> (t1 -1), with rate 999 per second:
>         for (long time = t(0); time != t(1); time += 1000) {
>             rate.record(metricsConfig, 999, time);
>         }
>         // Second sample window from t1 -> (t2 -1), with rate 1 per second:
>         for (long time = t(1); time != t(2); time += 1000) {
>             rate.record(metricsConfig, 1, time);
>         }
>         // Measure at bucket boundary, (though same issue exists all periodic 
> measurements)
>         final double m1 = rate.measure(metricsConfig, t(2));    // m1 = 1.0
>         // Third sample window from t2 -> (t3 -1), with rate 999 per second:
>         for (long time = t(2); time != t(3); time += 1000) {
>             rate.record(metricsConfig, 999, time);
>         }
>         // Second sample window from t3 -> (t4 -1), with rate 1 per second:
>         for (long time = t(3); time != t(4); time += 1000) {
>             rate.record(metricsConfig, 1, time);
>         }
>         // Measure second pair of samples:
>         final double m2 = rate.measure(metricsConfig, t(4));    // m2 = 1.0
>         assertEquals("Measurement of the rate over the first two samples", 
> 500.0, m1, 2.0);
>         assertEquals("Measurement of the rate over the last two samples", 
> 500.0, m2, 2.0);
>     }
>     @Test
>     public void testHowRateDropsMetricsWithRetardedObservations() throws 
> Exception {
>         final long retardation = 1000;
>         Rate rate = new Rate();
>         metricsConfig.samples(2);
>         metricsConfig.timeWindow(30, TimeUnit.SECONDS);
>         // First sample window from t0 -> (t1 -1), with rate 999 per second:
>         for (long time = t(0); time != t(1); time += 1000) {
>             rate.record(metricsConfig, 999, time);
>         }
>         // Second sample window from t1 -> (t2 -1), with rate 1 per second:
>         for (long time = t(1); time != t(2); time += 1000) {
>             rate.record(metricsConfig, 1, time);
>         }
>         double m1 = 0.0;
>         // Third sample window from t2 -> (t3 -1), with rate 999 per second:
>         for (long time = t(2); time != t(3); time += 1000) {
>             rate.record(metricsConfig, 999, time);
>             if (time == t(2) + retardation) {
>                 m1 = rate.measure(metricsConfig, time); // // m1 = 
> 65.something
>             }
>         }
>         // Second sample window from t3 -> (t4 -1), with rate 1 per second:
>         for (long time = t(3); time != t(4); time += 1000) {
>             rate.record(metricsConfig, 1, time);
>         }
>         double m2 = 0.0;
>         // Fifth sample window from t4 -> (t5 -1), with rate 999 per second:
>         for (long time = t(4); time != t(5); time += 1000) {
>             rate.record(metricsConfig, 999, time);
>             if (time == t(4) + retardation) {
>                 m2 = rate.measure(metricsConfig, time); // m2 = 65.something
>             }
>         }
>         assertTrue("Measurement of the rate over the first two samples should 
> be well over 250 per sec", m1 > 250.0);
>         assertTrue("Measurement of the rate over the last two samples should 
> be well over 250 per sec", m2 > 250.0);
>     }
>     @Test
>     public void testHowCountDropsMetrics() throws Exception {
>         Count count = new Count();
>         metricsConfig.samples(2);
>         metricsConfig.timeWindow(30, TimeUnit.SECONDS);
>         // Record count for constant rate of 1 per second over entire 60 
> second period:
>         for (long time = t(0); time != t(2); time += 1000) {
>             count.record(metricsConfig, 1, time);
>         }
>         final double m1 = count.measure(metricsConfig, t(2) -1);    // m1 = 60
>         count.record(metricsConfig, 1, t(2));
>         final double m2 = count.measure(metricsConfig, t(2));       // m2 = 31
>         assertEquals("Measurement of the count at end of the 60 second 
> period", 60, m1, 0.1);
>         assertEquals("Measurement of the count at 1ms after the 60 second 
> period", 60, m2, 0.1);
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to