This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 87a299a0810 KAFKA-18826: Add global thread metrics (#18953)
87a299a0810 is described below
commit 87a299a08103df50c8ceb344b15359ea91c1bf23
Author: Bill Bejeck <[email protected]>
AuthorDate: Wed Apr 2 09:17:49 2025 -0400
KAFKA-18826: Add global thread metrics (#18953)
When implementing
[KIP-1091](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1091%3A+Improved+Kafka+Streams+operator+metrics)
metrics for the Global Stream thread was overlooked. This ticket adds
the Global Thread metrics so they are available via the KIP-1076 process
of adding external Kafka metrics.
The existing integration test has been updated to confirm GlobalThread
metrics are sent via the broker plugin.
Reviewers: Matthias Sax <[email protected]>
---
.../KafkaStreamsTelemetryIntegrationTest.java | 129 +++++++++++++++++----
.../StreamsThreadMetricsDelegatingReporter.java | 10 +-
.../processor/internals/GlobalStreamThread.java | 4 +
.../streams/processor/internals/StreamThread.java | 2 +-
...StreamsThreadMetricsDelegatingReporterTest.java | 3 +-
5 files changed, 120 insertions(+), 28 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
index 750b2381a1a..63ef1eb4ea8 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsTelemetryIntegrationTest.java
@@ -48,6 +48,12 @@ import
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.ClientMetricsCommand;
@@ -98,8 +104,11 @@ public class KafkaStreamsTelemetryIntegrationTest {
private String outputTopicTwoPartitions;
private String inputTopicOnePartition;
private String outputTopicOnePartition;
+ private String globalStoreTopic;
+ private Uuid globalStoreConsumerInstanceId;
private Properties streamsApplicationProperties = new Properties();
private Properties streamsSecondApplicationProperties = new Properties();
+ private KeyValueIterator<String, String> globalStoreIterator;
private static EmbeddedKafkaCluster cluster;
private static final List<TestingMetricsInterceptingConsumer<byte[],
byte[]>> INTERCEPTING_CONSUMERS = new ArrayList<>();
@@ -125,10 +134,12 @@ public class KafkaStreamsTelemetryIntegrationTest {
outputTopicTwoPartitions = appId + "-output-two";
inputTopicOnePartition = appId + "-input-one";
outputTopicOnePartition = appId + "-output-one";
+ globalStoreTopic = appId + "-global-store";
cluster.createTopic(inputTopicTwoPartitions, 2, 1);
cluster.createTopic(outputTopicTwoPartitions, 2, 1);
cluster.createTopic(inputTopicOnePartition, 1, 1);
cluster.createTopic(outputTopicOnePartition, 1, 1);
+ cluster.createTopic(globalStoreTopic, 2, 1);
}
@AfterAll
@@ -144,6 +155,47 @@ public class KafkaStreamsTelemetryIntegrationTest {
if (!streamsSecondApplicationProperties.isEmpty()) {
IntegrationTestUtils.purgeLocalStreamsState(streamsSecondApplicationProperties);
}
+ if (globalStoreIterator != null) {
+ globalStoreIterator.close();
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"INFO", "DEBUG", "TRACE"})
+ public void shouldPushGlobalThreadMetricsToBroker(final String
recordingLevel) throws Exception {
+ streamsApplicationProperties = props(true);
+
streamsApplicationProperties.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG,
recordingLevel);
+ final Topology topology = simpleTopology(true);
+ subscribeForStreamsMetrics();
+ try (final KafkaStreams streams = new KafkaStreams(topology,
streamsApplicationProperties)) {
+ IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
+ final ClientInstanceIds clientInstanceIds =
streams.clientInstanceIds(Duration.ofSeconds(60));
+ for (final Map.Entry<String, Uuid> instanceId :
clientInstanceIds.consumerInstanceIds().entrySet()) {
+ final String instanceIdKey = instanceId.getKey();
+ if
(instanceIdKey.endsWith("GlobalStreamThread-global-consumer")) {
+ globalStoreConsumerInstanceId = instanceId.getValue();
+ }
+ }
+
+ assertNotNull(globalStoreConsumerInstanceId);
+ LOG.info("Global consumer instance id {}",
globalStoreConsumerInstanceId);
+ TestUtils.waitForCondition(
+ () ->
!TelemetryPlugin.SUBSCRIBED_METRICS.getOrDefault(globalStoreConsumerInstanceId,
Collections.emptyList()).isEmpty(),
+ 30_000,
+ "Never received subscribed metrics"
+ );
+
+ final List<String> expectedGlobalMetrics =
streams.metrics().values().stream().map(Metric::metricName)
+ .filter(metricName ->
metricName.tags().containsKey("thread-id") &&
+
metricName.tags().get("thread-id").endsWith("-GlobalStreamThread")).map(mn -> {
+ final String name = mn.name().replace('-',
'.');
+ final String group =
mn.group().replace("-metrics", "").replace('-', '.');
+ return "org.apache.kafka." + group + "." +
name;
+ }).filter(name ->
!name.equals("org.apache.kafka.stream.thread.state"))// telemetry reporter
filters out string metrics
+ .sorted().toList();
+ final List<String> actualGlobalMetrics = new
ArrayList<>(TelemetryPlugin.SUBSCRIBED_METRICS.get(globalStoreConsumerInstanceId));
+ assertEquals(expectedGlobalMetrics, actualGlobalMetrics);
+ }
}
@ParameterizedTest
@@ -152,7 +204,7 @@ public class KafkaStreamsTelemetryIntegrationTest {
// End-to-end test validating metrics pushed to broker
streamsApplicationProperties = props(true);
streamsApplicationProperties.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG,
recordingLevel);
- final Topology topology = simpleTopology();
+ final Topology topology = simpleTopology(false);
subscribeForStreamsMetrics();
try (final KafkaStreams streams = new KafkaStreams(topology,
streamsApplicationProperties)) {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
@@ -215,21 +267,21 @@ public class KafkaStreamsTelemetryIntegrationTest {
public void shouldPassMetrics(final String topologyType, final boolean
stateUpdaterEnabled) throws Exception {
// Streams metrics should get passed to Admin and Consumer
streamsApplicationProperties = props(stateUpdaterEnabled);
- final Topology topology = topologyType.equals("simple") ?
simpleTopology() : complexTopology();
+ final Topology topology = topologyType.equals("simple") ?
simpleTopology(false) : complexTopology();
try (final KafkaStreams streams = new KafkaStreams(topology,
streamsApplicationProperties)) {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
final List<MetricName> streamsThreadMetrics =
streams.metrics().values().stream().map(Metric::metricName)
- .filter(metricName ->
metricName.tags().containsKey("thread-id")).collect(Collectors.toList());
+ .filter(metricName ->
metricName.tags().containsKey("thread-id")).toList();
final List<MetricName> streamsClientMetrics =
streams.metrics().values().stream().map(Metric::metricName)
- .filter(metricName ->
metricName.group().equals("stream-metrics")).collect(Collectors.toList());
+ .filter(metricName ->
metricName.group().equals("stream-metrics")).toList();
- final List<MetricName> consumerPassedStreamThreadMetricNames =
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).collect(Collectors.toList());
- final List<MetricName> adminPassedStreamClientMetricNames =
INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).collect(Collectors.toList());
+ final List<MetricName> consumerPassedStreamThreadMetricNames =
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).toList();
+ final List<MetricName> adminPassedStreamClientMetricNames =
INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName).toList();
assertEquals(streamsThreadMetrics.size(),
consumerPassedStreamThreadMetricNames.size());
@@ -259,10 +311,10 @@ public class KafkaStreamsTelemetryIntegrationTest {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streamsOne);
final List<MetricName> streamsTaskMetricNames =
streamsOne.metrics().values().stream().map(Metric::metricName)
- .filter(metricName ->
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+ .filter(metricName ->
metricName.tags().containsKey("task-id")).toList();
final List<MetricName> consumerPassedStreamTaskMetricNames =
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).passedMetrics.stream().map(KafkaMetric::metricName)
- .filter(metricName ->
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+ .filter(metricName ->
metricName.tags().containsKey("task-id")).toList();
/*
With only one instance, Kafka Streams should register task
metrics for all tasks 0_0, 0_1, 1_0, 1_1
@@ -293,24 +345,24 @@ public class KafkaStreamsTelemetryIntegrationTest {
);
final List<MetricName> streamsOneTaskMetrics =
streamsOne.metrics().values().stream().map(Metric::metricName)
- .filter(metricName ->
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+ .filter(metricName ->
metricName.tags().containsKey("task-id")).toList();
final List<MetricName> streamsOneStateMetrics =
streamsOne.metrics().values().stream().map(Metric::metricName)
- .filter(metricName ->
metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
+ .filter(metricName ->
metricName.group().equals("stream-state-metrics")).toList();
final List<MetricName> consumerOnePassedTaskMetrics =
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT)
-
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName ->
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName ->
metricName.tags().containsKey("task-id")).toList();
final List<MetricName> consumerOnePassedStateMetrics =
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT)
-
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName ->
metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
+
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName ->
metricName.group().equals("stream-state-metrics")).toList();
final List<MetricName> streamsTwoTaskMetrics =
streamsTwo.metrics().values().stream().map(Metric::metricName)
- .filter(metricName ->
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+ .filter(metricName ->
metricName.tags().containsKey("task-id")).toList();
final List<MetricName> streamsTwoStateMetrics =
streamsTwo.metrics().values().stream().map(Metric::metricName)
- .filter(metricName ->
metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
+ .filter(metricName ->
metricName.group().equals("stream-state-metrics")).toList();
final List<MetricName> consumerTwoPassedTaskMetrics =
INTERCEPTING_CONSUMERS.get(SECOND_INSTANCE_CLIENT)
-
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName ->
metricName.tags().containsKey("task-id")).collect(Collectors.toList());
+
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName ->
metricName.tags().containsKey("task-id")).toList();
final List<MetricName> consumerTwoPassedStateMetrics =
INTERCEPTING_CONSUMERS.get(SECOND_INSTANCE_CLIENT)
-
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName ->
metricName.group().equals("stream-state-metrics")).collect(Collectors.toList());
+
.passedMetrics.stream().map(KafkaMetric::metricName).filter(metricName ->
metricName.group().equals("stream-state-metrics")).toList();
/*
Confirm pre-existing KafkaStreams instance one only passes
metrics for its tasks and has no metrics for previous tasks
*/
@@ -350,10 +402,10 @@ public class KafkaStreamsTelemetryIntegrationTest {
IntegrationTestUtils.startApplicationAndWaitUntilRunning(streams);
final List<MetricName> streamsThreadMetrics =
streams.metrics().values().stream().map(Metric::metricName)
- .filter(metricName ->
metricName.tags().containsKey("thread-id")).collect(Collectors.toList());
+ .filter(metricName ->
metricName.tags().containsKey("thread-id")).toList();
final List<MetricName> streamsClientMetrics =
streams.metrics().values().stream().map(Metric::metricName)
- .filter(metricName ->
metricName.group().equals("stream-metrics")).collect(Collectors.toList());
+ .filter(metricName ->
metricName.group().equals("stream-metrics")).toList();
final Map<MetricName, ? extends Metric> embeddedConsumerMetrics =
INTERCEPTING_CONSUMERS.get(FIRST_INSTANCE_CLIENT).metrics();
final Map<MetricName, ? extends Metric> embeddedAdminMetrics =
INTERCEPTING_ADMIN_CLIENTS.get(FIRST_INSTANCE_CLIENT).metrics();
@@ -419,8 +471,41 @@ public class KafkaStreamsTelemetryIntegrationTest {
return builder.build();
}
- private Topology simpleTopology() {
+
+ private void addGlobalStore(final StreamsBuilder builder) {
+ builder.addGlobalStore(
+ Stores.keyValueStoreBuilder(
+ Stores.inMemoryKeyValueStore("iq-test-store"),
+ Serdes.String(),
+ Serdes.String()
+ ),
+ globalStoreTopic,
+ Consumed.with(Serdes.String(), Serdes.String()),
+ () -> new Processor<>() {
+
+ // The store iterator is intentionally not closed here as
it needs
+ // to be open during the test, so the Streams app will
emit the
+ //
org.apache.kafka.stream.state.oldest.iterator.open.since.ms metric
+ // that is expected. So the globalStoreIterator is a
global variable
+ // (pun not intended), so it can be closed in the tearDown
method.
+ @SuppressWarnings("unchecked")
+ @Override
+ public void init(final ProcessorContext<Void, Void>
context) {
+ globalStoreIterator = ((KeyValueStore<String, String>)
context.getStateStore("iq-test-store")).all();
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ // no-op
+ }
+ });
+ }
+
+ private Topology simpleTopology(final boolean includeGlobalStore) {
final StreamsBuilder builder = new StreamsBuilder();
+ if (includeGlobalStore) {
+ addGlobalStore(builder);
+ }
builder.stream(inputTopicOnePartition, Consumed.with(Serdes.String(),
Serdes.String()))
.flatMapValues(value ->
Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
.to(outputTopicOnePartition, Produced.with(Serdes.String(),
Serdes.String()));
@@ -449,7 +534,7 @@ public class KafkaStreamsTelemetryIntegrationTest {
@Override
public Consumer<byte[], byte[]> getGlobalConsumer(final Map<String,
Object> config) {
- return new KafkaConsumer<>(config, new ByteArrayDeserializer(),
new ByteArrayDeserializer());
+ return new TestingMetricsInterceptingConsumer<>(config, new
ByteArrayDeserializer(), new ByteArrayDeserializer());
}
@Override
@@ -525,7 +610,7 @@ public class KafkaStreamsTelemetryIntegrationTest {
.stream()
.flatMap(rm -> rm.getScopeMetricsList().stream())
.flatMap(sm -> sm.getMetricsList().stream())
- .map(metric -> metric.getGauge())
+
.map(org.apache.kafka.shaded.io.opentelemetry.proto.metrics.v1.Metric::getGauge)
.flatMap(gauge -> gauge.getDataPointsList().stream())
.flatMap(numberDataPoint ->
numberDataPoint.getAttributesList().stream())
.filter(keyValue ->
keyValue.getKey().equals("process_id"))
@@ -539,7 +624,7 @@ public class KafkaStreamsTelemetryIntegrationTest {
.stream()
.flatMap(rm -> rm.getScopeMetricsList().stream())
.flatMap(sm -> sm.getMetricsList().stream())
- .map(metric -> metric.getName())
+
.map(org.apache.kafka.shaded.io.opentelemetry.proto.metrics.v1.Metric::getName)
.sorted()
.collect(Collectors.toList());
LOG.info("Found metrics {} for clientId={}", metricNames,
clientId);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
index 0e2a238a29a..65b7990dfe0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporter.java
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
public class StreamsThreadMetricsDelegatingReporter implements MetricsReporter
{
@@ -34,13 +35,13 @@ public class StreamsThreadMetricsDelegatingReporter
implements MetricsReporter {
private static final String THREAD_ID_TAG = "thread-id";
private final Consumer<byte[], byte[]> consumer;
private final String threadId;
- private final String stateUpdaterThreadId;
+ private final Optional<String> stateUpdaterThreadId;
- public StreamsThreadMetricsDelegatingReporter(final Consumer<byte[],
byte[]> consumer, final String threadId, final String stateUpdaterThreadId) {
+ public StreamsThreadMetricsDelegatingReporter(final Consumer<byte[],
byte[]> consumer, final String threadId, final Optional<String>
stateUpdaterThreadId) {
this.consumer = Objects.requireNonNull(consumer);
this.threadId = Objects.requireNonNull(threadId);
- this.stateUpdaterThreadId =
Objects.requireNonNull(stateUpdaterThreadId);
+ this.stateUpdaterThreadId = stateUpdaterThreadId;
log.debug("Creating MetricsReporter for threadId {} and stateUpdaterId
{}", threadId, stateUpdaterThreadId);
}
@@ -59,7 +60,8 @@ public class StreamsThreadMetricsDelegatingReporter
implements MetricsReporter {
private boolean tagMatchStreamOrStateUpdaterThreadId(final KafkaMetric
metric) {
final Map<String, String> tags = metric.metricName().tags();
- final boolean shouldInclude = tags.containsKey(THREAD_ID_TAG) &&
(tags.get(THREAD_ID_TAG).equals(threadId) ||
tags.get(THREAD_ID_TAG).equals(stateUpdaterThreadId));
+ final boolean shouldInclude = tags.containsKey(THREAD_ID_TAG) &&
(tags.get(THREAD_ID_TAG).equals(threadId) ||
+
Optional.ofNullable(tags.get(THREAD_ID_TAG)).equals(stateUpdaterThreadId));
if (!shouldInclude) {
log.trace("Rejecting metric {}", metric.metricName());
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
index aeed9dd332f..173ebdb8d4b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
+import
org.apache.kafka.streams.internals.metrics.StreamsThreadMetricsDelegatingReporter;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
@@ -43,6 +44,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
@@ -390,6 +392,8 @@ public class GlobalStreamThread extends Thread {
time
);
stateMgr.setGlobalProcessorContext(globalProcessorContext);
+ final StreamsThreadMetricsDelegatingReporter globalMetricsReporter
= new StreamsThreadMetricsDelegatingReporter(globalConsumer, getName(),
Optional.empty());
+
streamsMetrics.metricsRegistry().addReporter(globalMetricsReporter);
stateConsumer = new StateConsumer(
logContext,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index f505fab1554..74e7a27ceb9 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -482,7 +482,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
taskManager.setMainConsumer(mainConsumer);
referenceContainer.mainConsumer = mainConsumer;
- final StreamsThreadMetricsDelegatingReporter reporter = new
StreamsThreadMetricsDelegatingReporter(mainConsumer, threadId, stateUpdaterId);
+ final StreamsThreadMetricsDelegatingReporter reporter = new
StreamsThreadMetricsDelegatingReporter(mainConsumer, threadId,
Optional.of(stateUpdaterId));
streamsMetrics.metricsRegistry().addReporter(reporter);
final StreamThread streamThread = new StreamThread(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporterTest.java
b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporterTest.java
index faf30334e73..6cdfa442b34 100644
---
a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporterTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/StreamsThreadMetricsDelegatingReporterTest.java
@@ -33,6 +33,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -63,7 +64,7 @@ class StreamsThreadMetricsDelegatingReporterTest {
noThreadIdTagMap.put("client-id", "foo");
mockConsumer = new MockConsumer<>(AutoOffsetResetStrategy.NONE.name());
- streamsThreadMetricsDelegatingReporter = new
StreamsThreadMetricsDelegatingReporter(mockConsumer, threadId, stateUpdaterId);
+ streamsThreadMetricsDelegatingReporter = new
StreamsThreadMetricsDelegatingReporter(mockConsumer, threadId,
Optional.of(stateUpdaterId));
final MetricName metricNameOne = new MetricName("metric-one",
"test-group-one", "foo bar baz", threadIdTagMap);
final MetricName metricNameTwo = new MetricName("metric-two",
"test-group-two", "description two", threadIdWithStateUpdaterTagMap);