[ 
https://issues.apache.org/jira/browse/KAFKA-7240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16592027#comment-16592027
 ] 

ASF GitHub Bot commented on KAFKA-7240:
---------------------------------------

guozhangwang closed pull request #5467: KAFKA-7240: -total metrics in Streams 
are incorrect
URL: https://github.com/apache/kafka/pull/5467
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 79df5d158a1..7f3d31fd776 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -41,6 +41,7 @@
 import org.apache.kafka.streams.processor.Punctuator;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.internals.metrics.CumulativeCount;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
@@ -109,7 +110,7 @@
             );
             parent.add(
                 new MetricName("commit-total", group, "The total number of 
occurrence of commit operations.", allTagMap),
-                new Count()
+                new CumulativeCount()
             );
 
             // add the operation metrics with additional tags
@@ -129,7 +130,7 @@
             );
             taskCommitTimeSensor.add(
                 new MetricName("commit-total", group, "The total number of 
occurrence of commit operations.", tagMap),
-                new Count()
+                new CumulativeCount()
             );
 
             // add the metrics for enforced processing
@@ -140,7 +141,7 @@
             );
             taskEnforcedProcessSensor.add(
                     new MetricName("enforced-process-total", group, "The total 
number of occurrence of enforced-process operations.", tagMap),
-                    new Count()
+                    new CumulativeCount()
             );
 
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index efd94eaf637..28cedbe4197 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -44,6 +44,7 @@
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TaskMetadata;
 import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.streams.processor.internals.metrics.CumulativeCount;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 import org.slf4j.Logger;
@@ -437,7 +438,7 @@ StreamTask createTask(final Consumer<byte[], byte[]> 
consumer,
                 cache,
                 time,
                 () -> createProducer(taskId),
-                streamsMetrics.tasksClosedSensor);
+                streamsMetrics.taskClosedSensor);
         }
 
         private Producer<byte[], byte[]> createProducer(final TaskId id) {
@@ -518,7 +519,7 @@ StandbyTask createTask(final Consumer<byte[], byte[]> 
consumer,
         private final Sensor processTimeSensor;
         private final Sensor punctuateTimeSensor;
         private final Sensor taskCreatedSensor;
-        private final Sensor tasksClosedSensor;
+        private final Sensor taskClosedSensor;
 
         StreamsMetricsThreadImpl(final Metrics metrics, final String 
threadName) {
             super(metrics, threadName);
@@ -532,7 +533,7 @@ StandbyTask createTask(final Consumer<byte[], byte[]> 
consumer,
             addAvgMaxLatency(pollTimeSensor, group, tagMap(), "poll");
             // can't use addInvocationRateAndCount due to non-standard 
description string
             pollTimeSensor.add(metrics.metricName("poll-rate", group, "The 
average per-second number of record-poll calls", tagMap()), new 
Rate(TimeUnit.SECONDS, new Count()));
-            pollTimeSensor.add(metrics.metricName("poll-total", group, "The 
total number of record-poll calls", tagMap()), new Count());
+            pollTimeSensor.add(metrics.metricName("poll-total", group, "The 
total number of record-poll calls", tagMap()), new CumulativeCount());
 
             processTimeSensor = threadLevelSensor("process-latency", 
Sensor.RecordingLevel.INFO);
             addAvgMaxLatency(processTimeSensor, group, tagMap(), "process");
@@ -546,9 +547,9 @@ StandbyTask createTask(final Consumer<byte[], byte[]> 
consumer,
             taskCreatedSensor.add(metrics.metricName("task-created-rate", 
"stream-metrics", "The average per-second number of newly created tasks", 
tagMap()), new Rate(TimeUnit.SECONDS, new Count()));
             taskCreatedSensor.add(metrics.metricName("task-created-total", 
"stream-metrics", "The total number of newly created tasks", tagMap()), new 
Total());
 
-            tasksClosedSensor = threadLevelSensor("task-closed", 
Sensor.RecordingLevel.INFO);
-            tasksClosedSensor.add(metrics.metricName("task-closed-rate", 
group, "The average per-second number of closed tasks", tagMap()), new 
Rate(TimeUnit.SECONDS, new Count()));
-            tasksClosedSensor.add(metrics.metricName("task-closed-total", 
group, "The total number of closed tasks", tagMap()), new Total());
+            taskClosedSensor = threadLevelSensor("task-closed", 
Sensor.RecordingLevel.INFO);
+            taskClosedSensor.add(metrics.metricName("task-closed-rate", group, 
"The average per-second number of closed tasks", tagMap()), new 
Rate(TimeUnit.SECONDS, new Count()));
+            taskClosedSensor.add(metrics.metricName("task-closed-total", 
group, "The total number of closed tasks", tagMap()), new Total());
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/CumulativeCount.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/CumulativeCount.java
new file mode 100644
index 00000000000..2c12c2b6e9d
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/CumulativeCount.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.metrics;
+
+import org.apache.kafka.common.metrics.MeasurableStat;
+import org.apache.kafka.common.metrics.MetricConfig;
+
+/**
+ * A non-SampledStat version of Count for measuring -total metrics in streams
+ */
+public class CumulativeCount implements MeasurableStat {
+
+    private double count = 0.0;
+
+    @Override
+    public void record(final MetricConfig config, final double value, final 
long timeMs) {
+        count += 1;
+    }
+
+    @Override
+    public double measure(final MetricConfig config, final long now) {
+        return count;
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 56166a4d372..170311238ec 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -398,7 +398,7 @@ public static void addInvocationRateAndCount(final Sensor 
sensor,
                 "The total number of occurrence of " + operation + " 
operations.",
                 tags
             ),
-            new Count()
+            new CumulativeCount()
         );
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
index b065e2ca7b5..7ce27b4b6d6 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetricsImplTest.java
@@ -17,11 +17,17 @@
 package org.apache.kafka.streams.processor.internals;
 
 
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.metrics.KafkaMetric;
+import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.junit.Test;
 
+import java.util.concurrent.TimeUnit;
+
 import static org.junit.Assert.assertEquals;
 
 public class StreamsMetricsImplTest {
@@ -96,4 +102,42 @@ public void testThroughputMetrics() {
         streamsMetrics.removeSensor(sensor1);
         assertEquals(defaultMetrics, streamsMetrics.metrics().size());
     }
+
+    @Test
+    public void testTotalMetricDoesntDecrease() {
+        final MockTime time = new MockTime(1);
+        final MetricConfig config = new MetricConfig().timeWindow(1, 
TimeUnit.MILLISECONDS);
+        final Metrics metrics = new Metrics(config, time);
+        final StreamsMetricsImpl streamsMetrics = new 
StreamsMetricsImpl(metrics, "");
+
+        final String scope = "scope";
+        final String entity = "entity";
+        final String operation = "op";
+
+        final Sensor sensor = streamsMetrics.addLatencyAndThroughputSensor(
+                scope,
+                entity,
+                operation,
+                Sensor.RecordingLevel.INFO
+        );
+
+        final double latency = 100.0;
+        final MetricName totalMetricName = metrics.metricName(
+                "op-total",
+                "stream-scope-metrics",
+                "",
+                "client-id",
+                "",
+                "scope-id",
+                "entity"
+        );
+
+        final KafkaMetric totalMetric = metrics.metric(totalMetricName);
+
+        for (int i = 0; i < 10; i++) {
+            assertEquals(i, 
Math.round(totalMetric.measurable().measure(config, time.milliseconds())));
+            sensor.record(latency, time.milliseconds());
+        }
+
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> -total metrics in Streams are incorrect
> ---------------------------------------
>
>                 Key: KAFKA-7240
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7240
>             Project: Kafka
>          Issue Type: Bug
>          Components: metrics, streams
>    Affects Versions: 2.0.0
>            Reporter: Sam Lendle
>            Assignee: Sam Lendle
>            Priority: Major
>             Fix For: 2.1.0
>
>
> I noticed the values of total metrics for streams were decreasing 
> periodically when viewed in JMX, for example process-total for each 
> processor-node-id under stream-processor-node-metrics. 
> Edit: For processor node metrics, I should have been looking at 
> ProcessorNode, not  StreamsMetricsThreadImpl.
>  -Looking at StreamsMetricsThreadImpl, I believe this behavior is due to 
> using Count() as the Stat for the *-total metrics. Count() is a SampledStat, 
> so the value it reports is the count in recent time windows, and the value 
> decreases whenever a window is purged.-
> ----
> -This explains the behavior I saw, but I think the issue is deeper. For 
> example, processTimeSensor attempts to measure, process-latency-avg, 
> process-latency-max, process-rate, and process-total. For that sensor, record 
> is called like-
> -streamsMetrics.processTimeSensor.record(computeLatency() / (double) 
> processed, timerStartedMs);-
>  -so the value passed to record is average latency per processed message in 
> this batch if I understand correctly. That gets pushed through to the call to 
> Count#record, which increments it's count by 1, ignoring the value parameter. 
> Whatever stat is recording the total would need to know is the number of 
> messages processed. Because of that, I don't think it's possible for one 
> Sensor to measure both latency and total.-
> -That said, it's not clear to me how all the different Stats work and how 
> exactly Sensors work, and I don't actually understand how the process-rate 
> metric is working for similar reasons but that seems to be correct, so I may 
> be missing something here.-
>   
> cc [~guozhang]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to