[jira] [Commented] (KAFKA-7240) -total metrics in Streams are incorrect
[ https://issues.apache.org/jira/browse/KAFKA-7240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592030#comment-16592030 ] Guozhang Wang commented on KAFKA-7240: -- Thanks for your contribution [~slendle]!! > -total metrics in Streams are incorrect > --- > > Key: KAFKA-7240 > URL: https://issues.apache.org/jira/browse/KAFKA-7240 > Project: Kafka > Issue Type: Bug > Components: metrics, streams >Affects Versions: 2.0.0 >Reporter: Sam Lendle >Assignee: Sam Lendle >Priority: Major > Fix For: 2.1.0 > > > I noticed the values of total metrics for streams were decreasing > periodically when viewed in JMX, for example process-total for each > processor-node-id under stream-processor-node-metrics. > Edit: For processor node metrics, I should have been looking at > ProcessorNode, not StreamsMetricsThreadImpl. > -Looking at StreamsMetricsThreadImpl, I believe this behavior is due to > using Count() as the Stat for the *-total metrics. Count() is a SampledStat, > so the value it reports is the count in recent time windows, and the value > decreases whenever a window is purged.- > > -This explains the behavior I saw, but I think the issue is deeper. For > example, processTimeSensor attempts to measure, process-latency-avg, > process-latency-max, process-rate, and process-total. For that sensor, record > is called like- > -streamsMetrics.processTimeSensor.record(computeLatency() / (double) > processed, timerStartedMs);- > -so the value passed to record is average latency per processed message in > this batch if I understand correctly. That gets pushed through to the call to > Count#record, which increments it's count by 1, ignoring the value parameter. > Whatever stat is recording the total would need to know is the number of > messages processed. Because of that, I don't think it's possible for one > Sensor to measure both latency and total.- > -That said, it's not clear to me how all the different Stats work and how > exactly Sensors work, and I don't actually understand how the process-rate > metric is working for similar reasons but that seems to be correct, so I may > be missing something here.- > > cc [~guozhang] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7240) -total metrics in Streams are incorrect
[ https://issues.apache.org/jira/browse/KAFKA-7240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16592027#comment-16592027 ] ASF GitHub Bot commented on KAFKA-7240: --- guozhangwang closed pull request #5467: KAFKA-7240: -total metrics in Streams are incorrect URL: https://github.com/apache/kafka/pull/5467 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 79df5d158a1..7f3d31fd776 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -41,6 +41,7 @@ import org.apache.kafka.streams.processor.Punctuator; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TimestampExtractor; +import org.apache.kafka.streams.processor.internals.metrics.CumulativeCount; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.internals.ThreadCache; @@ -109,7 +110,7 @@ ); parent.add( new MetricName("commit-total", group, "The total number of occurrence of commit operations.", allTagMap), -new Count() +new CumulativeCount() ); // add the operation metrics with additional tags @@ -129,7 +130,7 @@ ); taskCommitTimeSensor.add( new MetricName("commit-total", group, "The total number of occurrence of commit operations.", tagMap), -new Count() +new CumulativeCount() ); // add the metrics for enforced processing @@ -140,7 +141,7 @@ ); taskEnforcedProcessSensor.add( new MetricName("enforced-process-total", group, "The total number of occurrence of enforced-process operations.", tagMap), -new Count() +new CumulativeCount() ); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index efd94eaf637..28cedbe4197 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -44,6 +44,7 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskMetadata; import org.apache.kafka.streams.processor.ThreadMetadata; +import org.apache.kafka.streams.processor.internals.metrics.CumulativeCount; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.state.internals.ThreadCache; import org.slf4j.Logger; @@ -437,7 +438,7 @@ StreamTask createTask(final Consumer consumer, cache, time, () -> createProducer(taskId), -streamsMetrics.tasksClosedSensor); +streamsMetrics.taskClosedSensor); } private Producer createProducer(final TaskId id) { @@ -518,7 +519,7 @@ StandbyTask createTask(final Consumer consumer, private final Sensor processTimeSensor; private final Sensor punctuateTimeSensor; private final Sensor taskCreatedSensor; -private final Sensor tasksClosedSensor; +private final Sensor taskClosedSensor; StreamsMetricsThreadImpl(final Metrics metrics, final String threadName) { super(metrics, threadName); @@ -532,7 +533,7 @@ StandbyTask createTask(final Consumer consumer, addAvgMaxLatency(pollTimeSensor, group, tagMap(), "poll"); // can't use addInvocationRateAndCount due to non-standard description string pollTimeSensor.add(metrics.metricName("poll-rate", group, "The average per-second number of record-poll calls", tagMap()), new Rate(TimeUnit.SECONDS, new Count())); -pollTimeSensor.add(metrics.metricName("poll-total", group, "The total number of record-poll calls", tagMap()), new Count()); +pollTimeSensor.add(metrics.metricName("poll-total", group, "The total number of record-poll calls", tagMap()), new CumulativeCount()); processTimeSensor = threadLevelSensor("process-latency", Sensor.RecordingLevel.INFO); addAvgMaxLatency(processTimeSensor, group, tagMap(), "process"); @@ -546,9 +547,9 @@
[jira] [Commented] (KAFKA-7240) -total metrics in Streams are incorrect
[ https://issues.apache.org/jira/browse/KAFKA-7240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16572129#comment-16572129 ] Matthias J. Sax commented on KAFKA-7240: PR did not get auto-linked: https://github.com/apache/kafka/pull/5467 > -total metrics in Streams are incorrect > --- > > Key: KAFKA-7240 > URL: https://issues.apache.org/jira/browse/KAFKA-7240 > Project: Kafka > Issue Type: Bug > Components: metrics, streams >Affects Versions: 2.0.0 >Reporter: Sam Lendle >Assignee: Sam Lendle >Priority: Major > > I noticed the values of total metrics for streams were decreasing > periodically when viewed in JMX, for example process-total for each > processor-node-id under stream-processor-node-metrics. > Edit: For processor node metrics, I should have been looking at > ProcessorNode, not StreamsMetricsThreadImpl. > -Looking at StreamsMetricsThreadImpl, I believe this behavior is due to > using Count() as the Stat for the *-total metrics. Count() is a SampledStat, > so the value it reports is the count in recent time windows, and the value > decreases whenever a window is purged.- > > -This explains the behavior I saw, but I think the issue is deeper. For > example, processTimeSensor attempts to measure, process-latency-avg, > process-latency-max, process-rate, and process-total. For that sensor, record > is called like- > -streamsMetrics.processTimeSensor.record(computeLatency() / (double) > processed, timerStartedMs);- > -so the value passed to record is average latency per processed message in > this batch if I understand correctly. That gets pushed through to the call to > Count#record, which increments it's count by 1, ignoring the value parameter. > Whatever stat is recording the total would need to know is the number of > messages processed. Because of that, I don't think it's possible for one > Sensor to measure both latency and total.- > -That said, it's not clear to me how all the different Stats work and how > exactly Sensors work, and I don't actually understand how the process-rate > metric is working for similar reasons but that seems to be correct, so I may > be missing something here.- > > cc [~guozhang] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7240) -total metrics in Streams are incorrect
[ https://issues.apache.org/jira/browse/KAFKA-7240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16568560#comment-16568560 ] John Roesler commented on KAFKA-7240: - [~slendle] For some reason, I can't assign this ticket to you... maybe you need some additional permission? > -total metrics in Streams are incorrect > --- > > Key: KAFKA-7240 > URL: https://issues.apache.org/jira/browse/KAFKA-7240 > Project: Kafka > Issue Type: Bug > Components: metrics, streams >Affects Versions: 2.0.0 >Reporter: Sam Lendle >Assignee: John Roesler >Priority: Major > > I noticed the values of total metrics for streams were decreasing > periodically when viewed in JMX, for example process-total for each > processor-node-id under stream-processor-node-metrics. > Edit: For processor node metrics, I should have been looking at > ProcessorNode, not StreamsMetricsThreadImpl. > -Looking at StreamsMetricsThreadImpl, I believe this behavior is due to > using Count() as the Stat for the *-total metrics. Count() is a SampledStat, > so the value it reports is the count in recent time windows, and the value > decreases whenever a window is purged.- > > -This explains the behavior I saw, but I think the issue is deeper. For > example, processTimeSensor attempts to measure, process-latency-avg, > process-latency-max, process-rate, and process-total. For that sensor, record > is called like- > -streamsMetrics.processTimeSensor.record(computeLatency() / (double) > processed, timerStartedMs);- > -so the value passed to record is average latency per processed message in > this batch if I understand correctly. That gets pushed through to the call to > Count#record, which increments it's count by 1, ignoring the value parameter. > Whatever stat is recording the total would need to know is the number of > messages processed. Because of that, I don't think it's possible for one > Sensor to measure both latency and total.- > -That said, it's not clear to me how all the different Stats work and how > exactly Sensors work, and I don't actually understand how the process-rate > metric is working for similar reasons but that seems to be correct, so I may > be missing something here.- > > cc [~guozhang] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7240) -total metrics in Streams are incorrect
[ https://issues.apache.org/jira/browse/KAFKA-7240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16568557#comment-16568557 ] John Roesler commented on KAFKA-7240: - Oh, by all means, go ahead! I'll assign it to you. If you don't want to do a kip, just declare the non-metered count metric somewhere in a `streams...internals` package (thus, it wouldn't be a public interface change). FYI, we're getting close to merging [https://github.com/apache/kafka/pull/5450,] which would affect your changeset, so you might want to either watch that PR and wait for the merge, or base your change on that branch. > -total metrics in Streams are incorrect > --- > > Key: KAFKA-7240 > URL: https://issues.apache.org/jira/browse/KAFKA-7240 > Project: Kafka > Issue Type: Bug > Components: metrics, streams >Affects Versions: 2.0.0 >Reporter: Sam Lendle >Assignee: John Roesler >Priority: Major > > I noticed the values of total metrics for streams were decreasing > periodically when viewed in JMX, for example process-total for each > processor-node-id under stream-processor-node-metrics. > Edit: For processor node metrics, I should have been looking at > ProcessorNode, not StreamsMetricsThreadImpl. > -Looking at StreamsMetricsThreadImpl, I believe this behavior is due to > using Count() as the Stat for the *-total metrics. Count() is a SampledStat, > so the value it reports is the count in recent time windows, and the value > decreases whenever a window is purged.- > > -This explains the behavior I saw, but I think the issue is deeper. For > example, processTimeSensor attempts to measure, process-latency-avg, > process-latency-max, process-rate, and process-total. For that sensor, record > is called like- > -streamsMetrics.processTimeSensor.record(computeLatency() / (double) > processed, timerStartedMs);- > -so the value passed to record is average latency per processed message in > this batch if I understand correctly. That gets pushed through to the call to > Count#record, which increments it's count by 1, ignoring the value parameter. > Whatever stat is recording the total would need to know is the number of > messages processed. Because of that, I don't think it's possible for one > Sensor to measure both latency and total.- > -That said, it's not clear to me how all the different Stats work and how > exactly Sensors work, and I don't actually understand how the process-rate > metric is working for similar reasons but that seems to be correct, so I may > be missing something here.- > > cc [~guozhang] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7240) -total metrics in Streams are incorrect
[ https://issues.apache.org/jira/browse/KAFKA-7240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16568457#comment-16568457 ] Sam Lendle commented on KAFKA-7240: --- [~vvcephei] I'd like to work on it if I can avoid a KIP :) I'll take a stab at a PR. > -total metrics in Streams are incorrect > --- > > Key: KAFKA-7240 > URL: https://issues.apache.org/jira/browse/KAFKA-7240 > Project: Kafka > Issue Type: Bug > Components: metrics, streams >Affects Versions: 2.0.0 >Reporter: Sam Lendle >Assignee: John Roesler >Priority: Major > > I noticed the values of total metrics for streams were decreasing > periodically when viewed in JMX, for example process-total for each > processor-node-id under stream-processor-node-metrics. > Edit: For processor node metrics, I should have been looking at > ProcessorNode, not StreamsMetricsThreadImpl. > -Looking at StreamsMetricsThreadImpl, I believe this behavior is due to > using Count() as the Stat for the *-total metrics. Count() is a SampledStat, > so the value it reports is the count in recent time windows, and the value > decreases whenever a window is purged.- > > -This explains the behavior I saw, but I think the issue is deeper. For > example, processTimeSensor attempts to measure, process-latency-avg, > process-latency-max, process-rate, and process-total. For that sensor, record > is called like- > -streamsMetrics.processTimeSensor.record(computeLatency() / (double) > processed, timerStartedMs);- > -so the value passed to record is average latency per processed message in > this batch if I understand correctly. That gets pushed through to the call to > Count#record, which increments it's count by 1, ignoring the value parameter. > Whatever stat is recording the total would need to know is the number of > messages processed. Because of that, I don't think it's possible for one > Sensor to measure both latency and total.- > -That said, it's not clear to me how all the different Stats work and how > exactly Sensors work, and I don't actually understand how the process-rate > metric is working for similar reasons but that seems to be correct, so I may > be missing something here.- > > cc [~guozhang] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7240) -total metrics in Streams are incorrect
[ https://issues.apache.org/jira/browse/KAFKA-7240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16568382#comment-16568382 ] John Roesler commented on KAFKA-7240: - Thanks for the report [~slendle]. That does indeed sound like a mistake! Just a note, we'd need a KIP to add a non-sampled Count metric in the `org.apache.kafka.common.metrics.stats` package. We could consider adding an internal implementation in Streams without a KIP, though... > -total metrics in Streams are incorrect > --- > > Key: KAFKA-7240 > URL: https://issues.apache.org/jira/browse/KAFKA-7240 > Project: Kafka > Issue Type: Bug > Components: metrics, streams >Affects Versions: 2.0.0 >Reporter: Sam Lendle >Priority: Major > > I noticed the values of total metrics for streams were decreasing > periodically when viewed in JMX, for example process-total for each > processor-node-id under stream-processor-node-metrics. > Edit: For processor node metrics, I should have been looking at > ProcessorNode, not StreamsMetricsThreadImpl. > -Looking at StreamsMetricsThreadImpl, I believe this behavior is due to > using Count() as the Stat for the *-total metrics. Count() is a SampledStat, > so the value it reports is the count in recent time windows, and the value > decreases whenever a window is purged.- > > -This explains the behavior I saw, but I think the issue is deeper. For > example, processTimeSensor attempts to measure, process-latency-avg, > process-latency-max, process-rate, and process-total. For that sensor, record > is called like- > -streamsMetrics.processTimeSensor.record(computeLatency() / (double) > processed, timerStartedMs);- > -so the value passed to record is average latency per processed message in > this batch if I understand correctly. That gets pushed through to the call to > Count#record, which increments it's count by 1, ignoring the value parameter. > Whatever stat is recording the total would need to know is the number of > messages processed. Because of that, I don't think it's possible for one > Sensor to measure both latency and total.- > -That said, it's not clear to me how all the different Stats work and how > exactly Sensors work, and I don't actually understand how the process-rate > metric is working for similar reasons but that seems to be correct, so I may > be missing something here.- > > cc [~guozhang] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7240) -total metrics in Streams are incorrect
[ https://issues.apache.org/jira/browse/KAFKA-7240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567632#comment-16567632 ] Sam Lendle commented on KAFKA-7240: --- I believe at least part of the issue is in [StreamsMetricsImpl#addThroughputMetrics|https://github.com/apache/kafka/blob/ee5cc974d2ef449444861d82e1793668184ca86f/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java#L352], which uses Count(). Count() is a SampledStat, so the value it reports is the count in recent time windows, and the value decreases whenever a window is purged I think the fix there would be to use a non-SampledStat version of Count(), as Total() is to Rate.SampledTotal(). > -total metrics in Streams are incorrect > --- > > Key: KAFKA-7240 > URL: https://issues.apache.org/jira/browse/KAFKA-7240 > Project: Kafka > Issue Type: Bug > Components: metrics, streams >Affects Versions: 2.0.0 >Reporter: Sam Lendle >Priority: Major > > I noticed the values of total metrics for streams were decreasing > periodically when viewed in JMX, for example process-total for each > processor-node-id under stream-processor-node-metrics. > Edit: For processor node metrics, I should have been looking at > ProcessorNode, not StreamsMetricsThreadImpl. > -Looking at StreamsMetricsThreadImpl, I believe this behavior is due to > using Count() as the Stat for the *-total metrics. Count() is a SampledStat, > so the value it reports is the count in recent time windows, and the value > decreases whenever a window is purged.- > > -This explains the behavior I saw, but I think the issue is deeper. For > example, processTimeSensor attempts to measure, process-latency-avg, > process-latency-max, process-rate, and process-total. For that sensor, record > is called like- > -streamsMetrics.processTimeSensor.record(computeLatency() / (double) > processed, timerStartedMs);- > -so the value passed to record is average latency per processed message in > this batch if I understand correctly. That gets pushed through to the call to > Count#record, which increments it's count by 1, ignoring the value parameter. > Whatever stat is recording the total would need to know is the number of > messages processed. Because of that, I don't think it's possible for one > Sensor to measure both latency and total.- > -That said, it's not clear to me how all the different Stats work and how > exactly Sensors work, and I don't actually understand how the process-rate > metric is working for similar reasons but that seems to be correct, so I may > be missing something here.- > > cc [~guozhang] -- This message was sent by Atlassian JIRA (v7.6.3#76005)