This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new a1b5591adb3 KAFKA-19529: State updater sensor names should be unique
(#20262) (#20272)
a1b5591adb3 is described below
commit a1b5591adb3d3b74ba50a4eb898513720a6b15a1
Author: Lucas Brutschy <[email protected]>
AuthorDate: Fri Aug 1 08:25:52 2025 +0200
KAFKA-19529: State updater sensor names should be unique (#20262) (#20272)
All state updater threads use the same metrics instance, but do not use
unique names for their sensors. This can have the following symptoms:
1) Data inserted into one sensor by one thread can affect the metrics of
all state updater threads.
2) If one state updater thread is shutdown, the metrics associated to
all state updater threads are removed.
3) If one state updater thread is started, while another one is removed,
it can happen that a metric is registered with the `Metrics` instance,
but not associated to any `Sensor` (because it is concurrently removed),
which means that the metric will not be removed upon shutdown. If a
thread with the same name later tries to register the same metric, we
may run into a `java.lang.IllegalArgumentException: A metric named ...
already exists`, as described in the ticket.
This change fixes the bug giving unique names to the sensors. A test is
added that there is no interference of the removal of sensors and
metrics during shutdown.
Reviewers: Matthias J. Sax <[email protected]>
---
.../processor/internals/DefaultStateUpdater.java | 54 +++++++++++----------
.../streams/processor/internals/StreamThread.java | 2 +-
.../internals/DefaultStateUpdaterTest.java | 56 +++++++++++++++++++++-
3 files changed, 83 insertions(+), 29 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
index d4d90246ca7..c9bfbee8353 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java
@@ -36,6 +36,7 @@ import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task.State;
import org.apache.kafka.streams.processor.internals.TaskAndAction.Action;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.slf4j.Logger;
@@ -90,7 +91,7 @@ public class DefaultStateUpdater implements StateUpdater {
private volatile KafkaFutureImpl<Uuid> clientInstanceIdFuture = new
KafkaFutureImpl<>();
public StateUpdaterThread(final String name,
- final Metrics metrics,
+ final StreamsMetricsImpl metrics,
final ChangelogReader changelogReader) {
super(name);
this.changelogReader = changelogReader;
@@ -747,7 +748,7 @@ public class DefaultStateUpdater implements StateUpdater {
private final Time time;
private final Logger log;
private final String name;
- private final Metrics metrics;
+ private final StreamsMetricsImpl metrics;
private final Consumer<byte[], byte[]> restoreConsumer;
private final ChangelogReader changelogReader;
private final TopologyMetadata topologyMetadata;
@@ -769,7 +770,7 @@ public class DefaultStateUpdater implements StateUpdater {
private CountDownLatch shutdownGate;
public DefaultStateUpdater(final String name,
- final Metrics metrics,
+ final StreamsMetricsImpl metrics,
final StreamsConfig config,
final Consumer<byte[], byte[]> restoreConsumer,
final ChangelogReader changelogReader,
@@ -1062,74 +1063,75 @@ public class DefaultStateUpdater implements
StateUpdater {
private final Sensor standbyRestoreRatioSensor;
private final Sensor checkpointRatioSensor;
- private final Deque<String> allSensorNames = new LinkedList<>();
+ private final Deque<Sensor> allSensors = new LinkedList<>();
private final Deque<MetricName> allMetricNames = new LinkedList<>();
- private StateUpdaterMetrics(final Metrics metrics, final String
threadId) {
+ private StateUpdaterMetrics(final StreamsMetricsImpl metrics, final
String threadId) {
final Map<String, String> threadLevelTags = new LinkedHashMap<>();
threadLevelTags.put(THREAD_ID_TAG, threadId);
+ final Metrics metricsRegistry = metrics.metricsRegistry();
- MetricName metricName =
metrics.metricName("active-restoring-tasks",
+ MetricName metricName =
metricsRegistry.metricName("active-restoring-tasks",
STATE_LEVEL_GROUP,
"The number of active tasks currently undergoing restoration",
threadLevelTags);
- metrics.addMetric(metricName, (config, now) -> stateUpdaterThread
!= null ?
+ metricsRegistry.addMetric(metricName, (config, now) ->
stateUpdaterThread != null ?
stateUpdaterThread.getNumRestoringActiveTasks() : 0);
allMetricNames.push(metricName);
- metricName = metrics.metricName("standby-updating-tasks",
+ metricName = metricsRegistry.metricName("standby-updating-tasks",
STATE_LEVEL_GROUP,
"The number of standby tasks currently undergoing state
update",
threadLevelTags);
- metrics.addMetric(metricName, (config, now) -> stateUpdaterThread
!= null ?
+ metricsRegistry.addMetric(metricName, (config, now) ->
stateUpdaterThread != null ?
stateUpdaterThread.getNumUpdatingStandbyTasks() : 0);
allMetricNames.push(metricName);
- metricName = metrics.metricName("active-paused-tasks",
+ metricName = metricsRegistry.metricName("active-paused-tasks",
STATE_LEVEL_GROUP,
"The number of active tasks paused restoring",
threadLevelTags);
- metrics.addMetric(metricName, (config, now) -> stateUpdaterThread
!= null ?
+ metricsRegistry.addMetric(metricName, (config, now) ->
stateUpdaterThread != null ?
stateUpdaterThread.getNumPausedActiveTasks() : 0);
allMetricNames.push(metricName);
- metricName = metrics.metricName("standby-paused-tasks",
+ metricName = metricsRegistry.metricName("standby-paused-tasks",
STATE_LEVEL_GROUP,
"The number of standby tasks paused state update",
threadLevelTags);
- metrics.addMetric(metricName, (config, now) -> stateUpdaterThread
!= null ?
+ metricsRegistry.addMetric(metricName, (config, now) ->
stateUpdaterThread != null ?
stateUpdaterThread.getNumPausedStandbyTasks() : 0);
allMetricNames.push(metricName);
- this.idleRatioSensor = metrics.sensor("idle-ratio",
RecordingLevel.INFO);
+ this.idleRatioSensor = metrics.threadLevelSensor(threadId,
"idle-ratio", RecordingLevel.INFO);
this.idleRatioSensor.add(new MetricName("idle-ratio",
STATE_LEVEL_GROUP, IDLE_RATIO_DESCRIPTION, threadLevelTags), new Avg());
- allSensorNames.add("idle-ratio");
+ allSensors.add(this.idleRatioSensor);
- this.activeRestoreRatioSensor =
metrics.sensor("active-restore-ratio", RecordingLevel.INFO);
+ this.activeRestoreRatioSensor =
metrics.threadLevelSensor(threadId, "active-restore-ratio",
RecordingLevel.INFO);
this.activeRestoreRatioSensor.add(new
MetricName("active-restore-ratio", STATE_LEVEL_GROUP,
RESTORE_RATIO_DESCRIPTION, threadLevelTags), new Avg());
- allSensorNames.add("active-restore-ratio");
+ allSensors.add(this.activeRestoreRatioSensor);
- this.standbyRestoreRatioSensor =
metrics.sensor("standby-update-ratio", RecordingLevel.INFO);
+ this.standbyRestoreRatioSensor =
metrics.threadLevelSensor(threadId, "standby-update-ratio",
RecordingLevel.INFO);
this.standbyRestoreRatioSensor.add(new
MetricName("standby-update-ratio", STATE_LEVEL_GROUP, UPDATE_RATIO_DESCRIPTION,
threadLevelTags), new Avg());
- allSensorNames.add("standby-update-ratio");
+ allSensors.add(this.standbyRestoreRatioSensor);
- this.checkpointRatioSensor = metrics.sensor("checkpoint-ratio",
RecordingLevel.INFO);
+ this.checkpointRatioSensor = metrics.threadLevelSensor(threadId,
"checkpoint-ratio", RecordingLevel.INFO);
this.checkpointRatioSensor.add(new MetricName("checkpoint-ratio",
STATE_LEVEL_GROUP, CHECKPOINT_RATIO_DESCRIPTION, threadLevelTags), new Avg());
- allSensorNames.add("checkpoint-ratio");
+ allSensors.add(this.checkpointRatioSensor);
- this.restoreSensor = metrics.sensor("restore-records",
RecordingLevel.INFO);
+ this.restoreSensor = metrics.threadLevelSensor(threadId,
"restore-records", RecordingLevel.INFO);
this.restoreSensor.add(new MetricName("restore-records-rate",
STATE_LEVEL_GROUP, RESTORE_RECORDS_RATE_DESCRIPTION, threadLevelTags), new
Rate());
this.restoreSensor.add(new MetricName("restore-call-rate",
STATE_LEVEL_GROUP, RESTORE_RATE_DESCRIPTION, threadLevelTags), new Rate(new
WindowedCount()));
- allSensorNames.add("restore-records");
+ allSensors.add(this.restoreSensor);
}
void clear() {
- while (!allSensorNames.isEmpty()) {
- metrics.removeSensor(allSensorNames.pop());
+ while (!allSensors.isEmpty()) {
+ metrics.removeSensor(allSensors.pop());
}
while (!allMetricNames.isEmpty()) {
- metrics.removeMetric(allMetricNames.pop());
+ metrics.metricsRegistry().removeMetric(allMetricNames.pop());
}
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 1243c85af0b..76a400026f6 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -537,7 +537,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
final String name = clientId + "-StateUpdater-" + threadIdx;
final StateUpdater stateUpdater = new DefaultStateUpdater(
name,
- streamsMetrics.metricsRegistry(),
+ streamsMetrics,
streamsConfig,
restoreConsumer,
changelogReader,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
index 7058ae9125f..ce71cd49d1b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java
@@ -28,6 +28,7 @@ import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.processor.TaskId;
import
org.apache.kafka.streams.processor.internals.StateUpdater.ExceptionAndTask;
import org.apache.kafka.streams.processor.internals.Task.State;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.AfterEach;
@@ -106,7 +107,7 @@ class DefaultStateUpdaterTest {
// need an auto-tick timer to work for draining with timeout
private final Time time = new MockTime(1L);
- private final Metrics metrics = new Metrics(time);
+ private final StreamsMetricsImpl metrics = new StreamsMetricsImpl(new
Metrics(time), "", "", time);
private final StreamsConfig config = new
StreamsConfig(configProps(COMMIT_INTERVAL));
private final ChangelogReader changelogReader =
mock(ChangelogReader.class);
private final TopologyMetadata topologyMetadata =
unnamedTopology().build();
@@ -1672,8 +1673,59 @@ class DefaultStateUpdaterTest {
assertThat(metrics.metrics().size(), is(1));
}
+ @Test
+ public void shouldRemoveMetricsWithoutInterference() {
+ final DefaultStateUpdater stateUpdater2 =
+ new DefaultStateUpdater("test-state-updater2", metrics, config,
null, changelogReader, topologyMetadata, time);
+ final List<MetricName> threadMetrics =
getMetricNames("test-state-updater");
+ final List<MetricName> threadMetrics2 =
getMetricNames("test-state-updater2");
+
+ stateUpdater.start();
+ stateUpdater2.start();
+
+ for (final MetricName metricName : threadMetrics) {
+ assertTrue(metrics.metrics().containsKey(metricName));
+ }
+ for (final MetricName metricName : threadMetrics2) {
+ assertTrue(metrics.metrics().containsKey(metricName));
+ }
+
+ stateUpdater2.shutdown(Duration.ofMinutes(1));
+
+ for (final MetricName metricName : threadMetrics) {
+ assertTrue(metrics.metrics().containsKey(metricName));
+ }
+ for (final MetricName metricName : threadMetrics2) {
+ assertFalse(metrics.metrics().containsKey(metricName));
+ }
+
+ stateUpdater.shutdown(Duration.ofMinutes(1));
+
+ for (final MetricName metricName : threadMetrics) {
+ assertFalse(metrics.metrics().containsKey(metricName));
+ }
+ for (final MetricName metricName : threadMetrics2) {
+ assertFalse(metrics.metrics().containsKey(metricName));
+ }
+ }
+
+ private static List<MetricName> getMetricNames(final String threadId) {
+ final Map<String, String> tagMap = mkMap(mkEntry("thread-id",
threadId));
+ return Arrays.asList(
+ new MetricName("active-restoring-tasks",
"stream-state-updater-metrics", "", tagMap),
+ new MetricName("standby-updating-tasks",
"stream-state-updater-metrics", "", tagMap),
+ new MetricName("active-paused-tasks",
"stream-state-updater-metrics", "", tagMap),
+ new MetricName("standby-paused-tasks",
"stream-state-updater-metrics", "", tagMap),
+ new MetricName("idle-ratio", "stream-state-updater-metrics", "",
tagMap),
+ new MetricName("standby-update-ratio",
"stream-state-updater-metrics", "", tagMap),
+ new MetricName("checkpoint-ratio", "stream-state-updater-metrics",
"", tagMap),
+ new MetricName("restore-records-rate",
"stream-state-updater-metrics", "", tagMap),
+ new MetricName("restore-call-rate",
"stream-state-updater-metrics", "", tagMap)
+ );
+ }
+
@SuppressWarnings("unchecked")
- private static <T> void verifyMetric(final Metrics metrics,
+ private static <T> void verifyMetric(final StreamsMetricsImpl metrics,
final MetricName metricName,
final Matcher<T> matcher) {
assertThat(metrics.metrics().get(metricName).metricName().description(),
is(metricName.description()));