This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7e07659b7a6 MINOR: Add comment why we use thread-id filtering when 
registering metrics for KIP-1076 (#19957)
7e07659b7a6 is described below

commit 7e07659b7a6f628a41ad770a277d123e167350a3
Author: Bill Bejeck <[email protected]>
AuthorDate: Thu Jun 12 17:20:52 2025 -0400

    MINOR: Add comment why we use thread-id filtering when registering metrics 
for KIP-1076 (#19957)
    
    Adding a descriptive comment why it's necessary to filter metrics
    registration by thread-id tags.  This is due to the fact that the
    `StreamsMetric` is a singleton shared by all StreamThread instances, so
    we need to make sure only add metrics for the current StreamThread
    otherwise duplicate metrics are registered.
    
    Reviewers: Matthias Sax <[email protected]>
---
 .../internals/metrics/StreamsThreadMetricsDelegatingReporter.java  | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
 
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
index 65b7990dfe0..1c6ce0a2035 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
@@ -58,6 +58,13 @@ public class StreamsThreadMetricsDelegatingReporter 
implements MetricsReporter {
         }
     }
 
+    /*
+       The StreamMetrics object is a singleton shared by all StreamThread 
instances.
+       So we need to make sure we only pass metrics for the current 
StreamThread that contains this
+       MetricsReporter instance, which will register metrics with the embedded 
KafkaConsumer to pass
+       through the telemetry pipeline.
+       Otherwise, Kafka Streams would register multiple metrics for all 
StreamThreads.
+     */
     private boolean tagMatchStreamOrStateUpdaterThreadId(final KafkaMetric 
metric) {
         final Map<String, String> tags = metric.metricName().tags();
         final boolean shouldInclude = tags.containsKey(THREAD_ID_TAG) && 
(tags.get(THREAD_ID_TAG).equals(threadId) ||

Reply via email to