This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 44c6e956ed3 KAFKA-19529: State updater sensor names should be unique
(#20262)
44c6e956ed3 is described below
commit 44c6e956ed3e95ef416abb4dc4b6d5bb8ee42cdb
Author: Lucas Brutschy <[email protected]>
AuthorDate: Thu Jul 31 08:58:52 2025 +0200
KAFKA-19529: State updater sensor names should be unique (#20262)
All state updater threads use the same metrics instance, but do not use
unique names for their sensors. This can have the following symptoms:
1) Data inserted into one sensor by one thread can affect the metrics of
all state updater threads.
2) If one state updater thread is shutdown, the metrics associated to
all state updater threads are removed.
3) If one state updater thread is started, while another one is removed,
it can happen that a metric is registered with the `Metrics` instance,
but not associated to any `Sensor` (because it is concurrently removed),
which means that the metric will not be removed upon shutdown. If a
thread with the same name later tries to register the same metric, we
may run into a `java.lang.IllegalArgumentException: A metric named ...
already exists`, as described in the ticket.
This change fixes the bug giving unique names to the sensors. A test is
added that there is no interference of the removal of sensors and
metrics during shutdown.
Reviewers: Matthias J. Sax <[email protected]>
---
.../processor/internals/DefaultStateUpdater.java | 52 ++++++++++----------
.../streams/processor/internals/StreamThread.java | 2 +-
.../internals/DefaultStateUpdaterTest.java | 56 +++++++++++++++++++++-
3 files changed, 82 insertions(+), 28 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index 716d8c42ec5..a3a44f6f02d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task.State;
import org.apache.kafka.streams.processor.internals.TaskAndAction.Action;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.slf4j.Logger;
@@ -89,7 +90,7 @@ public class DefaultStateUpdater implements StateUpdater {
private volatile KafkaFutureImpl<Uuid> clientInstanceIdFuture = new
KafkaFutureImpl<>();
public StateUpdaterThread(final String name,
- final Metrics metrics,
+ final StreamsMetricsImpl metrics,
final ChangelogReader changelogReader) {
super(name);
this.changelogReader = changelogReader;
@@ -745,7 +746,7 @@ public class DefaultStateUpdater implements StateUpdater {
private final Time time;
private final Logger log;
private final String name;
- private final Metrics metrics;
+ private final StreamsMetricsImpl metrics;
private final Consumer<byte[], byte[]> restoreConsumer;
private final ChangelogReader changelogReader;
private final TopologyMetadata topologyMetadata;
@@ -766,7 +767,7 @@ public class DefaultStateUpdater implements StateUpdater {
private StateUpdaterThread stateUpdaterThread = null;
public DefaultStateUpdater(final String name,
- final Metrics metrics,
+ final StreamsMetricsImpl metrics,
final StreamsConfig config,
final Consumer<byte[], byte[]> restoreConsumer,
final ChangelogReader changelogReader,
@@ -1062,70 +1063,71 @@ public class DefaultStateUpdater implements
StateUpdater {
private final Sensor standbyRestoreRatioSensor;
private final Sensor checkpointRatioSensor;
- private final Deque<String> allSensorNames = new LinkedList<>();
+ private final Deque<Sensor> allSensors = new LinkedList<>();
private final Deque<MetricName> allMetricNames = new LinkedList<>();
- private StateUpdaterMetrics(final Metrics metrics, final String
threadId) {
+ private StateUpdaterMetrics(final StreamsMetricsImpl metrics, final
String threadId) {
final Map<String, String> threadLevelTags = new LinkedHashMap<>();
threadLevelTags.put(THREAD_ID_TAG, threadId);
+ final Metrics metricsRegistry = metrics.metricsRegistry();
- MetricName metricName =
metrics.metricName("active-restoring-tasks",
+ MetricName metricName =
metricsRegistry.metricName("active-restoring-tasks",
STATE_LEVEL_GROUP,
"The number of active tasks currently undergoing restoration",
threadLevelTags);
- metrics.addMetric(metricName, (config, now) -> stateUpdaterThread
!= null ?
+ metricsRegistry.addMetric(metricName, (config, now) ->
stateUpdaterThread != null ?
stateUpdaterThread.numRestoringActiveTasks() : 0);
allMetricNames.push(metricName);
- metricName = metrics.metricName("standby-updating-tasks",
+ metricName = metricsRegistry.metricName("standby-updating-tasks",
STATE_LEVEL_GROUP,
"The number of standby tasks currently undergoing state
update",
threadLevelTags);
- metrics.addMetric(metricName, (config, now) -> stateUpdaterThread
!= null ?
+ metricsRegistry.addMetric(metricName, (config, now) ->
stateUpdaterThread != null ?
stateUpdaterThread.numUpdatingStandbyTasks() : 0);
allMetricNames.push(metricName);
- metricName = metrics.metricName("active-paused-tasks",
+ metricName = metricsRegistry.metricName("active-paused-tasks",
STATE_LEVEL_GROUP,
"The number of active tasks paused restoring",
threadLevelTags);
- metrics.addMetric(metricName, (config, now) -> stateUpdaterThread
!= null ?
+ metricsRegistry.addMetric(metricName, (config, now) ->
stateUpdaterThread != null ?
stateUpdaterThread.numPausedActiveTasks() : 0);
allMetricNames.push(metricName);
- metricName = metrics.metricName("standby-paused-tasks",
+ metricName = metricsRegistry.metricName("standby-paused-tasks",
STATE_LEVEL_GROUP,
"The number of standby tasks paused state update",
threadLevelTags);
- metrics.addMetric(metricName, (config, now) -> stateUpdaterThread
!= null ?
+ metricsRegistry.addMetric(metricName, (config, now) ->
stateUpdaterThread != null ?
stateUpdaterThread.numPausedStandbyTasks() : 0);
allMetricNames.push(metricName);
- this.idleRatioSensor = metrics.sensor("idle-ratio",
RecordingLevel.INFO);
+ this.idleRatioSensor = metrics.threadLevelSensor(threadId,
"idle-ratio", RecordingLevel.INFO);
this.idleRatioSensor.add(new MetricName("idle-ratio",
STATE_LEVEL_GROUP, IDLE_RATIO_DESCRIPTION, threadLevelTags), new Avg());
- allSensorNames.add("idle-ratio");
+ allSensors.add(this.idleRatioSensor);
- this.activeRestoreRatioSensor =
metrics.sensor("active-restore-ratio", RecordingLevel.INFO);
+ this.activeRestoreRatioSensor =
metrics.threadLevelSensor(threadId, "active-restore-ratio",
RecordingLevel.INFO);
this.activeRestoreRatioSensor.add(new
MetricName("active-restore-ratio", STATE_LEVEL_GROUP,
RESTORE_RATIO_DESCRIPTION, threadLevelTags), new Avg());
- allSensorNames.add("active-restore-ratio");
+ allSensors.add(this.activeRestoreRatioSensor);
- this.standbyRestoreRatioSensor =
metrics.sensor("standby-update-ratio", RecordingLevel.INFO);
+ this.standbyRestoreRatioSensor =
metrics.threadLevelSensor(threadId, "standby-update-ratio",
RecordingLevel.INFO);
this.standbyRestoreRatioSensor.add(new
MetricName("standby-update-ratio", STATE_LEVEL_GROUP, UPDATE_RATIO_DESCRIPTION,
threadLevelTags), new Avg());
- allSensorNames.add("standby-update-ratio");
+ allSensors.add(this.standbyRestoreRatioSensor);
- this.checkpointRatioSensor = metrics.sensor("checkpoint-ratio",
RecordingLevel.INFO);
+ this.checkpointRatioSensor = metrics.threadLevelSensor(threadId,
"checkpoint-ratio", RecordingLevel.INFO);
this.checkpointRatioSensor.add(new MetricName("checkpoint-ratio",
STATE_LEVEL_GROUP, CHECKPOINT_RATIO_DESCRIPTION, threadLevelTags), new Avg());
- allSensorNames.add("checkpoint-ratio");
+ allSensors.add(this.checkpointRatioSensor);
- this.restoreSensor = metrics.sensor("restore-records",
RecordingLevel.INFO);
+ this.restoreSensor = metrics.threadLevelSensor(threadId,
"restore-records", RecordingLevel.INFO);
this.restoreSensor.add(new MetricName("restore-records-rate",
STATE_LEVEL_GROUP, RESTORE_RECORDS_RATE_DESCRIPTION, threadLevelTags), new
Rate());
this.restoreSensor.add(new MetricName("restore-call-rate",
STATE_LEVEL_GROUP, RESTORE_RATE_DESCRIPTION, threadLevelTags), new Rate(new
WindowedCount()));
- allSensorNames.add("restore-records");
+ allSensors.add(this.restoreSensor);
}
void clear() {
- while (!allSensorNames.isEmpty()) {
- metrics.removeSensor(allSensorNames.pop());
+ while (!allSensors.isEmpty()) {
+ metrics.removeSensor(allSensors.pop());
}
while (!allMetricNames.isEmpty()) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 9509369fa2a..65dec15b3ee 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -649,7 +649,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
final String name = clientId + STATE_UPDATER_ID_SUBSTRING +
threadIdx;
return new DefaultStateUpdater(
name,
- streamsMetrics.metricsRegistry(),
+ streamsMetrics,
streamsConfig,
restoreConsumer,
changelogReader,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index abb128698a0..b6d41966257 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.processor.TaskId;
import
org.apache.kafka.streams.processor.internals.StateUpdater.ExceptionAndTask;
import org.apache.kafka.streams.processor.internals.Task.State;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.AfterEach;
@@ -105,7 +106,7 @@ class DefaultStateUpdaterTest {
// need an auto-tick timer to work for draining with timeout
private final Time time = new MockTime(1L);
- private final Metrics metrics = new Metrics(time);
+ private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new
Metrics(time), "", "", time);
private final StreamsConfig config = new
StreamsConfig(configProps(COMMIT_INTERVAL));
private final ChangelogReader changelogReader =
mock(ChangelogReader.class);
private final TopologyMetadata topologyMetadata =
unnamedTopology().build();
@@ -1680,8 +1681,59 @@ class DefaultStateUpdaterTest {
assertThat(metrics.metrics().size(), is(1));
}
+ @Test
+ public void shouldRemoveMetricsWithoutInterference() {
+ final DefaultStateUpdater stateUpdater2 =
+ new DefaultStateUpdater("test-state-updater2", metrics, config,
null, changelogReader, topologyMetadata, time);
+ final List<MetricName> threadMetrics =
getMetricNames("test-state-updater");
+ final List<MetricName> threadMetrics2 =
getMetricNames("test-state-updater2");
+
+ stateUpdater.start();
+ stateUpdater2.start();
+
+ for (final MetricName metricName : threadMetrics) {
+ assertTrue(metrics.metrics().containsKey(metricName));
+ }
+ for (final MetricName metricName : threadMetrics2) {
+ assertTrue(metrics.metrics().containsKey(metricName));
+ }
+
+ stateUpdater2.shutdown(Duration.ofMinutes(1));
+
+ for (final MetricName metricName : threadMetrics) {
+ assertTrue(metrics.metrics().containsKey(metricName));
+ }
+ for (final MetricName metricName : threadMetrics2) {
+ assertFalse(metrics.metrics().containsKey(metricName));
+ }
+
+ stateUpdater.shutdown(Duration.ofMinutes(1));
+
+ for (final MetricName metricName : threadMetrics) {
+ assertFalse(metrics.metrics().containsKey(metricName));
+ }
+ for (final MetricName metricName : threadMetrics2) {
+ assertFalse(metrics.metrics().containsKey(metricName));
+ }
+ }
+
+ private static List<MetricName> getMetricNames(final String threadId) {
+ final Map<String, String> tagMap = Map.of("thread-id", threadId);
+ return List.of(
+ new MetricName("active-restoring-tasks",
"stream-state-updater-metrics", "", tagMap),
+ new MetricName("standby-updating-tasks",
"stream-state-updater-metrics", "", tagMap),
+ new MetricName("active-paused-tasks",
"stream-state-updater-metrics", "", tagMap),
+ new MetricName("standby-paused-tasks",
"stream-state-updater-metrics", "", tagMap),
+ new MetricName("idle-ratio", "stream-state-updater-metrics", "",
tagMap),
+ new MetricName("standby-update-ratio",
"stream-state-updater-metrics", "", tagMap),
+ new MetricName("checkpoint-ratio", "stream-state-updater-metrics",
"", tagMap),
+ new MetricName("restore-records-rate",
"stream-state-updater-metrics", "", tagMap),
+ new MetricName("restore-call-rate",
"stream-state-updater-metrics", "", tagMap)
+ );
+ }
+
@SuppressWarnings("unchecked")
- private static <T> void verifyMetric(final Metrics metrics,
+ private static <T> void verifyMetric(final StreamsMetricsImpl metrics,
final MetricName metricName,
final Matcher<T> matcher) {
assertThat(metrics.metrics().get(metricName).metricName().description(),
is(metricName.description()));