This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7e07659b7a6 MINOR: Add comment why we use thread-id filtering when
registering metrics for KIP-1076 (#19957)
7e07659b7a6 is described below
commit 7e07659b7a6f628a41ad770a277d123e167350a3
Author: Bill Bejeck <[email protected]>
AuthorDate: Thu Jun 12 17:20:52 2025 -0400
MINOR: Add comment why we use thread-id filtering when registering metrics
for KIP-1076 (#19957)
Adding a descriptive comment why it's necessary to filter metrics
registration by thread-id tags. This is due to the fact that the
`StreamsMetric` is a singleton shared by all StreamThread instances, so
we need to make sure only add metrics for the current StreamThread
otherwise duplicate metrics are registered.
Reviewers: Matthias Sax <[email protected]>
---
.../internals/metrics/StreamsThreadMetricsDelegatingReporter.java | 7 +++++++
1 file changed, 7 insertions(+)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
index 65b7990dfe0..1c6ce0a2035 100644
---
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
@@ -58,6 +58,13 @@ public class StreamsThreadMetricsDelegatingReporter
implements MetricsReporter {
}
}
+ /*
+ The StreamMetrics object is a singleton shared by all StreamThread
instances.
+ So we need to make sure we only pass metrics for the current
StreamThread that contains this
+ MetricsReporter instance, which will register metrics with the embedded
KafkaConsumer to pass
+ through the telemetry pipeline.
+ Otherwise, Kafka Streams would register multiple metrics for all
StreamThreads.
+ */
private boolean tagMatchStreamOrStateUpdaterThreadId(final KafkaMetric
metric) {
final Map<String, String> tags = metric.metricName().tags();
final boolean shouldInclude = tags.containsKey(THREAD_ID_TAG) &&
(tags.get(THREAD_ID_TAG).equals(threadId) ||