This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 87a299a0810 KAFKA-18826: Add global thread metrics  (#18953)
87a299a0810 is described below

commit 87a299a08103df50c8ceb344b15359ea91c1bf23
Author: Bill Bejeck <[email protected]>
AuthorDate: Wed Apr 2 09:17:49 2025 -0400

    KAFKA-18826: Add global thread metrics  (#18953)
    
    When implementing
    
[KIP-1091](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1091%3A+Improved+Kafka+Streams+operator+metrics)
    metrics for the Global Stream thread was overlooked. This ticket adds
    the Global Thread metrics so they are available via the KIP-1076 process
    of adding external Kafka metrics.
    
    The existing integration test has been updated to confirm GlobalThread
    metrics are sent via the broker plugin.
    
    Reviewers: Matthias Sax <[email protected]>
---
 .../KafkaStreamsTelemetryIntegrationTest.java      | 129 +++++++++++++++++----
 .../StreamsThreadMetricsDelegatingReporter.java    |  10 +-
 .../processor/internals/GlobalStreamThread.java    |   4 +
 .../streams/processor/internals/StreamThread.java  |   2 +-
 ...StreamsThreadMetricsDelegatingReporterTest.java |   3 +-
 5 files changed, 120 insertions(+), 28 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
index 750b2381a1a..63ef1eb4ea8 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
@@ -48,6 +48,12 @@ import 
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.TestUtils;
 import org.apache.kafka.tools.ClientMetricsCommand;
 
@@ -98,8 +104,11 @@ public class KafkaStreamsTelemetryIntegrationTest {
     private String outputTopicTwoPartitions;
     private String inputTopicOnePartition;
     private String outputTopicOnePartition;
+    private String globalStoreTopic;
+    private Uuid globalStoreConsumerInstanceId;
     private Properties streamsApplicationProperties = new Properties();
     private Properties streamsSecondApplicationProperties = new Properties();
+    private  KeyValueIterator<String, String> globalStoreIterator;
 
     private static EmbeddedKafkaCluster cluster;
     private static final List<TestingMetricsInterceptingConsumer<byte[], 
byte[]>> INTERCEPTING_CONSUMERS = new ArrayList<>();
@@ -125,10 +134,12 @@ public class KafkaStreamsTelemetryIntegrationTest {
         outputTopicTwoPartitions = appId + "-output-two";
         inputTopicOnePartition = appId + "-input-one";
         outputTopicOnePartition = appId + "-output-one";
+        globalStoreTopic = appId + "-global-store";
         cluster.createTopic(inputTopicTwoPartitions, 2, 1);
         cluster.createTopic(outputTopicTwoPartitions, 2, 1);
         cluster.createTopic(inputTopicOnePartition, 1, 1);
         cluster.createTopic(outputTopicOnePartition, 1, 1);
+        cluster.createTopic(globalStoreTopic, 2, 1);
     }
 
     @AfterAll
@@ -144,6 +155,47 @@ public class KafkaStreamsTelemetryIntegrationTest {
         if (!streamsSecondApplicationProperties.isEmpty()) {
             
IntegrationTestUtils.purgeLocalStreamsState(streamsSecondApplicationProperties);
         }
+        if (globalStoreIterator != null) {
+            globalStoreIterator.close();
+        }
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"INFO", "DEBUG", "TRACE"})
+    public void shouldPushGlobalThreadMetricsToBroker(final String 
recordingLevel) throws Exception {
+        streamsApplicationProperties = props(true);
+        
streamsApplicationProperties.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, 
recordingLevel);
+        final Topology topology = simpleTopology(true);
+        subscribeForStreamsMetrics();
+        try (final KafkaStreams streams = new KafkaStreams(topology, 
streamsApplicationProperties)) {
+            IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+            final ClientInstanceIds clientInstanceIds = 
streams.clientInstanceIds(Duration.ofSeconds(60));
+            for (final Map.Entry<String, Uuid> instanceId : 
clientInstanceIds.consumerInstanceIds().entrySet()) {
+                final String instanceIdKey = instanceId.getKey();
+                if 
(instanceIdKey.endsWith("GlobalStreamThread-global-consumer")) {
+                    globalStoreConsumerInstanceId = instanceId.getValue();
+                }
+            }
+
+            assertNotNull(globalStoreConsumerInstanceId);
+            LOG.info("Global consumer instance id {}", 
globalStoreConsumerInstanceId);
+            TestUtils.waitForCondition(
+                    () -> 
!TelemetryPlugin.SUBSCRIBED_METRICS.getOrDefault(globalStoreConsumerInstanceId, 
Collections.emptyList()).isEmpty(),
+                    30_000,
+                    "Never received subscribed metrics"
+            );
+
+            final List<String> expectedGlobalMetrics = 
streams.metrics().values().stream().map(Metric::metricName)
+                    .filter(metricName -> 
metricName.tags().containsKey("thread-id") &&
+                            
metricName.tags().get("thread-id").endsWith("-GlobalStreamThread")).map(mn -> {
+                                final String name = mn.name().replace('-', 
'.');
+                                final String group = 
mn.group().replace("-metrics", "").replace('-', '.');
+                                return "org.apache.kafka." + group + "." + 
name;
+                            }).filter(name -> 
!name.equals("org.apache.kafka.stream.thread.state"))// telemetry reporter 
filters out string metrics
+                    .sorted().toList();
+            final List<String> actualGlobalMetrics = new 
ArrayList<>(TelemetryPlugin.SUBSCRIBED_METRICS.get(globalStoreConsumerInstanceId));
+            assertEquals(expectedGlobalMetrics, actualGlobalMetrics);
+        }
     }
 
     @ParameterizedTest
@@ -152,7 +204,7 @@ public class KafkaStreamsTelemetryIntegrationTest {
         // End-to-end test validating metrics pushed to broker
         streamsApplicationProperties  = props(true);
         
streamsApplicationProperties.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, 
recordingLevel);
-        final Topology topology = simpleTopology();
+        final Topology topology = simpleTopology(false);
         subscribeForStreamsMetrics();
         try (final KafkaStreams streams = new KafkaStreams(topology, 
streamsApplicationProperties)) {
             IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
@@ -215,21 +267,21 @@ public class KafkaStreamsTelemetryIntegrationTest {
     public void shouldPassMetrics(final String topologyType, final boolean 
stateUpdaterEnabled) throws Exception {
         // Streams metrics should get passed to Admin and Consumer
         streamsApplicationProperties = props(stateUpdaterEnabled);
-        final Topology topology = topologyType.equals("simple") ? 
simpleTopology() : complexTopology();
+        final Topology topology = topologyType.equals("simple") ? 
simpleTopology(false) : complexTopology();
        
         try (final KafkaStreams streams = new KafkaStreams(topology, 
streamsApplicationProperties)) {
             IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
 
             final List<MetricName> streamsThreadMetrics = 
streams.metrics().values().stream().map(Metric::metricName)
-                    .filter(metricName -> 
metricName.tags().containsKey("thread-id")).collect(Collectors.toList());
+                    .filter(metricName -> 
metricName.tags().containsKey("thread-id")).toList();
 
             final List<MetricName> streamsClientMetrics = 
streams.metrics().values().stream().map(Metric::metricName)
-                    .filter(metricName -> 
metricName.group().equals("stream-metrics")).collect(Collectors.toList());
+                    .filter(metricName -> 
metricName.group().equals("stream-metrics")).toList();
 
 
 
-            final List<MetricName> consumerPassedStreamThreadMetricNames = 
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).collect(Collectors.toList());
-            final List<MetricName> adminPassedStreamClientMetricNames = 
INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).collect(Collectors.toList());
+            final List<MetricName> consumerPassedStreamThreadMetricNames = 
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).toList();
+            final List<MetricName> adminPassedStreamClientMetricNames = 
INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).toList();
 
 
             assertEquals(streamsThreadMetrics.size(), 
consumerPassedStreamThreadMetricNames.size());
@@ -259,10 +311,10 @@ public class KafkaStreamsTelemetryIntegrationTest {
             
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streamsOne);
 
             final List<MetricName> streamsTaskMetricNames = 
streamsOne.metrics().values().stream().map(Metric::metricName)
-                    .filter(metricName -> 
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+                    .filter(metricName -> 
metricName.tags().containsKey("task-id")).toList();
 
             final List<MetricName> consumerPassedStreamTaskMetricNames = 
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName)
-                    .filter(metricName -> 
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+                    .filter(metricName -> 
metricName.tags().containsKey("task-id")).toList();
 
             /*
              With only one instance, Kafka Streams should register task 
metrics for all tasks 0_0, 0_1, 1_0, 1_1
@@ -293,24 +345,24 @@ public class KafkaStreamsTelemetryIntegrationTest {
                 );
 
                 final List<MetricName> streamsOneTaskMetrics = 
streamsOne.metrics().values().stream().map(Metric::metricName)
-                        .filter(metricName -> 
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+                        .filter(metricName -> 
metricName.tags().containsKey("task-id")).toList();
                 final List<MetricName> streamsOneStateMetrics = 
streamsOne.metrics().values().stream().map(Metric::metricName)
-                        .filter(metricName -> 
metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
+                        .filter(metricName -> 
metricName.group().equals("stream-state-metrics")).toList();
                 
                 final List<MetricName> consumerOnePassedTaskMetrics = 
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT)
-                        
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> 
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+                        
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> 
metricName.tags().containsKey("task-id")).toList();
                 final List<MetricName> consumerOnePassedStateMetrics = 
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT)
-                        
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> 
metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
+                        
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> 
metricName.group().equals("stream-state-metrics")).toList();
 
                 final List<MetricName> streamsTwoTaskMetrics = 
streamsTwo.metrics().values().stream().map(Metric::metricName)
-                        .filter(metricName -> 
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+                        .filter(metricName -> 
metricName.tags().containsKey("task-id")).toList();
                 final List<MetricName> streamsTwoStateMetrics = 
streamsTwo.metrics().values().stream().map(Metric::metricName)
-                        .filter(metricName -> 
metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
+                        .filter(metricName -> 
metricName.group().equals("stream-state-metrics")).toList();
                 
                 final List<MetricName> consumerTwoPassedTaskMetrics = 
INTERCEPTING_CONSUMERS.get(SECOND_INSTANCE_CLIENT)
-                        
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> 
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+                        
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> 
metricName.tags().containsKey("task-id")).toList();
                 final List<MetricName> consumerTwoPassedStateMetrics = 
INTERCEPTING_CONSUMERS.get(SECOND_INSTANCE_CLIENT)
-                        
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> 
metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
+                        
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName -> 
metricName.group().equals("stream-state-metrics")).toList();
                 /*
                  Confirm pre-existing KafkaStreams instance one only passes 
metrics for its tasks and has no metrics for previous tasks
                  */
@@ -350,10 +402,10 @@ public class KafkaStreamsTelemetryIntegrationTest {
             IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
 
             final List<MetricName> streamsThreadMetrics = 
streams.metrics().values().stream().map(Metric::metricName)
-                    .filter(metricName -> 
metricName.tags().containsKey("thread-id")).collect(Collectors.toList());
+                    .filter(metricName -> 
metricName.tags().containsKey("thread-id")).toList();
 
             final List<MetricName> streamsClientMetrics = 
streams.metrics().values().stream().map(Metric::metricName)
-                    .filter(metricName -> 
metricName.group().equals("stream-metrics")).collect(Collectors.toList());
+                    .filter(metricName -> 
metricName.group().equals("stream-metrics")).toList();
 
             final Map<MetricName, ? extends Metric> embeddedConsumerMetrics = 
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).metrics();
             final Map<MetricName, ? extends Metric> embeddedAdminMetrics = 
INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).metrics();
@@ -419,8 +471,41 @@ public class KafkaStreamsTelemetryIntegrationTest {
         return builder.build();
     }
 
-    private Topology simpleTopology() {
+
+    private void addGlobalStore(final StreamsBuilder builder) {
+        builder.addGlobalStore(
+            Stores.keyValueStoreBuilder(
+                Stores.inMemoryKeyValueStore("iq-test-store"),
+                Serdes.String(),
+                Serdes.String()
+        ),
+                globalStoreTopic,
+                Consumed.with(Serdes.String(), Serdes.String()),
+                () -> new Processor<>() {
+
+                    // The store iterator is intentionally not closed here as 
it needs
+                    // to be open during the test, so the Streams app will 
emit the
+                    // 
org.apache.kafka.stream.state.oldest.iterator.open.since.ms metric
+                    // that is expected. So the globalStoreIterator is a 
global variable
+                    // (pun not intended), so it can be closed in the tearDown 
method.
+                    @SuppressWarnings("unchecked")
+                    @Override
+                    public void init(final ProcessorContext<Void, Void> 
context) {
+                        globalStoreIterator = ((KeyValueStore<String, String>) 
context.getStateStore("iq-test-store")).all();
+                    }
+
+                    @Override
+                    public void process(final Record<String, String> record) {
+                        // no-op
+                    }
+                });
+    }
+
+    private Topology simpleTopology(final boolean includeGlobalStore) {
         final StreamsBuilder builder = new StreamsBuilder();
+        if (includeGlobalStore) {
+            addGlobalStore(builder);
+        }
         builder.stream(inputTopicOnePartition, Consumed.with(Serdes.String(), 
Serdes.String()))
                 .flatMapValues(value -> 
Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
                 .to(outputTopicOnePartition, Produced.with(Serdes.String(), 
Serdes.String()));
@@ -449,7 +534,7 @@ public class KafkaStreamsTelemetryIntegrationTest {
 
         @Override
         public Consumer<byte[], byte[]> getGlobalConsumer(final Map<String, 
Object> config) {
-            return new KafkaConsumer<>(config, new ByteArrayDeserializer(), 
new ByteArrayDeserializer());
+            return new TestingMetricsInterceptingConsumer<>(config, new 
ByteArrayDeserializer(), new ByteArrayDeserializer());
         }
 
         @Override
@@ -525,7 +610,7 @@ public class KafkaStreamsTelemetryIntegrationTest {
                         .stream()
                         .flatMap(rm -> rm.getScopeMetricsList().stream())
                         .flatMap(sm -> sm.getMetricsList().stream())
-                        .map(metric -> metric.getGauge())
+                        
.map(org.apache.kafka.shaded.io.opentelemetry.proto.metrics.v1.Metric::getGauge)
                         .flatMap(gauge -> gauge.getDataPointsList().stream())
                         .flatMap(numberDataPoint -> 
numberDataPoint.getAttributesList().stream())
                         .filter(keyValue -> 
keyValue.getKey().equals("process_id"))
@@ -539,7 +624,7 @@ public class KafkaStreamsTelemetryIntegrationTest {
                         .stream()
                         .flatMap(rm -> rm.getScopeMetricsList().stream())
                         .flatMap(sm -> sm.getMetricsList().stream())
-                        .map(metric -> metric.getName())
+                        
.map(org.apache.kafka.shaded.io.opentelemetry.proto.metrics.v1.Metric::getName)
                         .sorted()
                         .collect(Collectors.toList());
                 LOG.info("Found metrics {} for clientId={}", metricNames, 
clientId);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
 
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
index 0e2a238a29a..65b7990dfe0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 
 public class StreamsThreadMetricsDelegatingReporter implements MetricsReporter 
{
     
@@ -34,13 +35,13 @@ public class StreamsThreadMetricsDelegatingReporter 
implements MetricsReporter {
     private static final String THREAD_ID_TAG = "thread-id";
     private final Consumer<byte[], byte[]> consumer;
     private final String threadId;
-    private final String stateUpdaterThreadId;
+    private final Optional<String> stateUpdaterThreadId;
 
 
-    public StreamsThreadMetricsDelegatingReporter(final Consumer<byte[], 
byte[]> consumer, final String threadId, final String stateUpdaterThreadId) {
+    public StreamsThreadMetricsDelegatingReporter(final Consumer<byte[], 
byte[]> consumer, final String threadId, final Optional<String> 
stateUpdaterThreadId) {
         this.consumer = Objects.requireNonNull(consumer);
         this.threadId = Objects.requireNonNull(threadId);
-        this.stateUpdaterThreadId = 
Objects.requireNonNull(stateUpdaterThreadId);
+        this.stateUpdaterThreadId = stateUpdaterThreadId;
         log.debug("Creating MetricsReporter for threadId {} and stateUpdaterId 
{}", threadId, stateUpdaterThreadId);
     }
 
@@ -59,7 +60,8 @@ public class StreamsThreadMetricsDelegatingReporter 
implements MetricsReporter {
 
     private boolean tagMatchStreamOrStateUpdaterThreadId(final KafkaMetric 
metric) {
         final Map<String, String> tags = metric.metricName().tags();
-        final boolean shouldInclude = tags.containsKey(THREAD_ID_TAG) && 
(tags.get(THREAD_ID_TAG).equals(threadId) || 
tags.get(THREAD_ID_TAG).equals(stateUpdaterThreadId));
+        final boolean shouldInclude = tags.containsKey(THREAD_ID_TAG) && 
(tags.get(THREAD_ID_TAG).equals(threadId) ||
+                
Optional.ofNullable(tags.get(THREAD_ID_TAG)).equals(stateUpdaterThreadId));
         if (!shouldInclude) {
             log.trace("Rejecting metric {}", metric.metricName());
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index aeed9dd332f..173ebdb8d4b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
+import 
org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -43,6 +44,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
@@ -390,6 +392,8 @@ public class GlobalStreamThread extends Thread {
                 time
             );
             stateMgr.setGlobalProcessorContext(globalProcessorContext);
+            final StreamsThreadMetricsDelegatingReporter globalMetricsReporter 
= new StreamsThreadMetricsDelegatingReporter(globalConsumer, getName(), 
Optional.empty());
+            
streamsMetrics.metricsRegistry().addReporter(globalMetricsReporter);
 
             stateConsumer = new StateConsumer(
                 logContext,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index f505fab1554..74e7a27ceb9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -482,7 +482,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
         taskManager.setMainConsumer(mainConsumer);
         referenceContainer.mainConsumer = mainConsumer;
 
-        final StreamsThreadMetricsDelegatingReporter reporter = new 
StreamsThreadMetricsDelegatingReporter(mainConsumer, threadId, stateUpdaterId);
+        final StreamsThreadMetricsDelegatingReporter reporter = new 
StreamsThreadMetricsDelegatingReporter(mainConsumer, threadId, 
Optional.of(stateUpdaterId));
         streamsMetrics.metricsRegistry().addReporter(reporter);
 
         final StreamThread streamThread = new StreamThread(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporterTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporterTest.java
index faf30334e73..6cdfa442b34 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporterTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporterTest.java
@@ -33,6 +33,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -63,7 +64,7 @@ class StreamsThreadMetricsDelegatingReporterTest {
         noThreadIdTagMap.put("client-id", "foo");
 
         mockConsumer = new MockConsumer<>(AutoOffsetResetStrategy.NONE.name());
-        streamsThreadMetricsDelegatingReporter = new 
StreamsThreadMetricsDelegatingReporter(mockConsumer, threadId, stateUpdaterId);
+        streamsThreadMetricsDelegatingReporter = new 
StreamsThreadMetricsDelegatingReporter(mockConsumer, threadId, 
Optional.of(stateUpdaterId));
 
         final MetricName metricNameOne = new MetricName("metric-one", 
"test-group-one", "foo bar baz", threadIdTagMap);
         final MetricName metricNameTwo = new MetricName("metric-two", 
"test-group-two", "description two", threadIdWithStateUpdaterTagMap);

Reply via email to