This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new fb054b590e7 KAFKA-19398: (De)Register oldest-iterator-open-since-ms 
metric dynamically (#20022)
fb054b590e7 is described below

commit fb054b590e774374f4e9ce7ad5e69b3f98ffbd77
Author: Matthias J. Sax <[email protected]>
AuthorDate: Tue Jun 24 17:01:36 2025 -0700

    KAFKA-19398: (De)Register oldest-iterator-open-since-ms metric dynamically 
(#20022)
    
    The metric for oldest-iterator-open-since-ms might report a null value
    if there is not open iterator.
    
    This PR changes the behavior to dynamically register/deregister the
    entire metric instead of allowing it to return a null value.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../streams/internals/metrics/OpenIterators.java   | 73 ++++++++++++++++++++++
 .../internals/metrics/StreamsMetricsImpl.java      | 20 +++---
 .../state/internals/MeteredKeyValueStore.java      | 22 ++-----
 .../MeteredMultiVersionedKeyQueryIterator.java     | 12 +---
 .../internals/MeteredTimestampedKeyValueStore.java |  2 -
 .../internals/MeteredVersionedKeyValueStore.java   |  1 -
 .../state/internals/metrics/StateStoreMetrics.java | 13 ++--
 .../state/internals/MeteredKeyValueStoreTest.java  | 12 ++--
 .../MeteredTimestampedKeyValueStoreTest.java       | 12 ++--
 .../MeteredVersionedKeyValueStoreTest.java         | 12 ++--
 10 files changed, 121 insertions(+), 58 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java
 
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java
new file mode 100644
index 00000000000..5e2307d0ac6
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.internals.metrics;
+
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.state.internals.MeteredIterator;
+import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
+
+import java.util.Comparator;
+import java.util.NavigableSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.LongAdder;
+
+public class OpenIterators {
+    private final TaskId taskId;
+    private final String metricsScope;
+    private final String name;
+    private final StreamsMetricsImpl streamsMetrics;
+
+    private final LongAdder numOpenIterators = new LongAdder();
+    private final NavigableSet<MeteredIterator> openIterators = new 
ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
+
+    private MetricName metricName;
+
+    public OpenIterators(final TaskId taskId,
+                         final String metricsScope,
+                         final String name,
+                         final StreamsMetricsImpl streamsMetrics) {
+        this.taskId = taskId;
+        this.metricsScope = metricsScope;
+        this.name = name;
+        this.streamsMetrics = streamsMetrics;
+    }
+
+    public void add(final MeteredIterator iterator) {
+        openIterators.add(iterator);
+        numOpenIterators.increment();
+
+        if (numOpenIterators.intValue() == 1) {
+            metricName = 
StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, 
name, streamsMetrics,
+                (config, now) -> openIterators.first().startTimestamp()
+            );
+        }
+    }
+
+    public void remove(final MeteredIterator iterator) {
+        if (numOpenIterators.intValue() == 1) {
+            streamsMetrics.removeMetric(metricName);
+        }
+        numOpenIterators.decrement();
+        openIterators.remove(iterator);
+    }
+
+    public long sum() {
+        return numOpenIterators.sum();
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
index 0dd3f77f199..7c22c94e9af 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
@@ -335,6 +335,10 @@ public class StreamsMetricsImpl implements StreamsMetrics {
         }
     }
 
+    public void removeMetric(final MetricName metricName) {
+        metrics.removeMetric(metricName);
+    }
+
     public Map<String, String> taskLevelTagMap(final String threadId, final 
String taskId) {
         final Map<String, String> tagMap = new LinkedHashMap<>();
         tagMap.put(THREAD_ID_TAG, threadId);
@@ -517,13 +521,13 @@ public class StreamsMetricsImpl implements StreamsMetrics 
{
         return getSensors(storeLevelSensors, sensorSuffix, sensorPrefix, 
recordingLevel, parents);
     }
 
-    public <T> void addStoreLevelMutableMetric(final String taskId,
-                                               final String metricsScope,
-                                               final String storeName,
-                                               final String name,
-                                               final String description,
-                                               final RecordingLevel 
recordingLevel,
-                                               final Gauge<T> valueProvider) {
+    public <T> MetricName addStoreLevelMutableMetric(final String taskId,
+                                                     final String metricsScope,
+                                                     final String storeName,
+                                                     final String name,
+                                                     final String description,
+                                                     final RecordingLevel 
recordingLevel,
+                                                     final Gauge<T> 
valueProvider) {
         final MetricName metricName = metrics.metricName(
             name,
             STATE_STORE_LEVEL_GROUP,
@@ -535,6 +539,8 @@ public class StreamsMetricsImpl implements StreamsMetrics {
             final String key = 
storeSensorPrefix(Thread.currentThread().getName(), taskId, storeName);
             storeLevelMetrics.computeIfAbsent(key, ignored -> new 
LinkedList<>()).push(metricName);
         }
+
+        return metricName;
     }
 
     public final void removeAllStoreLevelSensorsAndMetrics(final String taskId,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 7622f904c17..0962033b7ef 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.internals.metrics.OpenIterators;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
 import org.apache.kafka.streams.processor.StateStore;
@@ -48,14 +49,9 @@ import 
org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
 import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
 
 import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.NavigableSet;
 import java.util.Objects;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.atomic.LongAdder;
 import java.util.function.Function;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -96,8 +92,7 @@ public class MeteredKeyValueStore<K, V>
     private StreamsMetricsImpl streamsMetrics;
     private TaskId taskId;
 
-    protected LongAdder numOpenIterators = new LongAdder();
-    protected NavigableSet<MeteredIterator> openIterators = new 
ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
+    protected OpenIterators openIterators;
 
     @SuppressWarnings("rawtypes")
     private final Map<Class, QueryHandler> queryHandlers =
@@ -153,13 +148,8 @@ public class MeteredKeyValueStore<K, V>
         e2eLatencySensor = 
StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), 
streamsMetrics);
         iteratorDurationSensor = 
StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, 
name(), streamsMetrics);
         StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
-                (config, now) -> numOpenIterators.sum());
-        StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
-                (config, now) -> {
-                    final Iterator<MeteredIterator> openIteratorsIterator = 
openIterators.iterator();
-                    return openIteratorsIterator.hasNext() ? 
openIteratorsIterator.next().startTimestamp() : null;
-                }
-        );
+                (config, now) -> openIterators.sum());
+        openIterators = new OpenIterators(taskId, metricsScope, name(), 
streamsMetrics);
     }
 
     protected Serde<V> prepareValueSerdeForStore(final Serde<V> valueSerde, 
final SerdeGetter getter) {
@@ -445,7 +435,6 @@ public class MeteredKeyValueStore<K, V>
             this.sensor = sensor;
             this.startTimestamp = time.milliseconds();
             this.startNs = time.nanoseconds();
-            numOpenIterators.increment();
             openIterators.add(this);
         }
 
@@ -475,7 +464,6 @@ public class MeteredKeyValueStore<K, V>
                 final long duration = time.nanoseconds() - startNs;
                 sensor.record(duration);
                 iteratorDurationSensor.record(duration);
-                numOpenIterators.decrement();
                 openIterators.remove(this);
             }
         }
@@ -502,7 +490,6 @@ public class MeteredKeyValueStore<K, V>
             this.valueDeserializer = valueDeserializer;
             this.startTimestamp = time.milliseconds();
             this.startNs = time.nanoseconds();
-            numOpenIterators.increment();
             openIterators.add(this);
         }
 
@@ -532,7 +519,6 @@ public class MeteredKeyValueStore<K, V>
                 final long duration = time.nanoseconds() - startNs;
                 sensor.record(duration);
                 iteratorDurationSensor.record(duration);
-                numOpenIterators.decrement();
                 openIterators.remove(this);
             }
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java
index b1bb0b63158..b27e6a78d86 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java
@@ -18,39 +18,34 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.internals.metrics.OpenIterators;
 import org.apache.kafka.streams.state.VersionedRecord;
 import org.apache.kafka.streams.state.VersionedRecordIterator;
 
-import java.util.Set;
-import java.util.concurrent.atomic.LongAdder;
 import java.util.function.Function;
 
 class MeteredMultiVersionedKeyQueryIterator<V> implements 
VersionedRecordIterator<V>, MeteredIterator {
 
     private final VersionedRecordIterator<byte[]> iterator;
     private final Function<VersionedRecord<byte[]>, VersionedRecord<V>> 
deserializeValue;
-    private final LongAdder numOpenIterators;
     private final Sensor sensor;
     private final Time time;
     private final long startNs;
     private final long startTimestampMs;
-    private final Set<MeteredIterator> openIterators;
+    private final OpenIterators openIterators;
 
     public MeteredMultiVersionedKeyQueryIterator(final 
VersionedRecordIterator<byte[]> iterator,
                                                  final Sensor sensor,
                                                  final Time time,
                                                  final 
Function<VersionedRecord<byte[]>, VersionedRecord<V>> deserializeValue,
-                                                 final LongAdder 
numOpenIterators,
-                                                 final Set<MeteredIterator> 
openIterators) {
+                                                 final OpenIterators 
openIterators) {
         this.iterator = iterator;
         this.deserializeValue = deserializeValue;
-        this.numOpenIterators = numOpenIterators;
         this.openIterators = openIterators;
         this.sensor = sensor;
         this.time = time;
         this.startNs = time.nanoseconds();
         this.startTimestampMs = time.milliseconds();
-        numOpenIterators.increment();
         openIterators.add(this);
     }
 
@@ -65,7 +60,6 @@ class MeteredMultiVersionedKeyQueryIterator<V> implements 
VersionedRecordIterato
             iterator.close();
         } finally {
             sensor.record(time.nanoseconds() - startNs);
-            numOpenIterators.decrement();
             openIterators.remove(this);
         }
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
index 6f90ef56d86..0cfb8936a5e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStore.java
@@ -326,7 +326,6 @@ public class MeteredTimestampedKeyValueStore<K, V>
             this.startNs = time.nanoseconds();
             this.startTimestampMs = time.milliseconds();
             this.returnPlainValue = returnPlainValue;
-            numOpenIterators.increment();
             openIterators.add(this);
         }
 
@@ -360,7 +359,6 @@ public class MeteredTimestampedKeyValueStore<K, V>
                 final long duration = time.nanoseconds() - startNs;
                 sensor.record(duration);
                 iteratorDurationSensor.record(duration);
-                numOpenIterators.decrement();
                 openIterators.remove(this);
             }
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
index 66eb3206def..6afb4d1531d 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
@@ -269,7 +269,6 @@ public class MeteredVersionedKeyValueStore<K, V>
                             iteratorDurationSensor,
                             time,
                             StoreQueryUtils.deserializeValue(plainValueSerdes),
-                            numOpenIterators,
                             openIterators
                         );
                 final QueryResult<MeteredMultiVersionedKeyQueryIterator<V>> 
typedQueryResult =
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
index 2422fa9d5e3..bb60c304684 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state.internals.metrics;
 
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.Gauge;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
@@ -455,12 +456,12 @@ public class StateStoreMetrics {
 
     }
 
-    public static void addOldestOpenIteratorGauge(final String taskId,
-                                                  final String storeType,
-                                                  final String storeName,
-                                                  final StreamsMetricsImpl 
streamsMetrics,
-                                                  final Gauge<Long> 
oldestOpenIteratorGauge) {
-        streamsMetrics.addStoreLevelMutableMetric(
+    public static MetricName addOldestOpenIteratorGauge(final String taskId,
+                                                        final String storeType,
+                                                        final String storeName,
+                                                        final 
StreamsMetricsImpl streamsMetrics,
+                                                        final Gauge<Long> 
oldestOpenIteratorGauge) {
+        return streamsMetrics.addStoreLevelMutableMetric(
                 taskId,
                 storeType,
                 storeName,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 4a8c891355d..294af3944f2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -525,15 +525,16 @@ public class MeteredKeyValueStoreTest {
         when(inner.all()).thenReturn(KeyValueIterators.emptyIterator());
         init();
 
-        final KafkaMetric oldestIteratorTimestampMetric = 
metric("oldest-iterator-open-since-ms");
-        assertThat(oldestIteratorTimestampMetric, not(nullValue()));
-
-        assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue());
+        KafkaMetric oldestIteratorTimestampMetric = 
metric("oldest-iterator-open-since-ms");
+        assertThat(oldestIteratorTimestampMetric, nullValue());
 
         KeyValueIterator<String, String> second = null;
         final long secondTimestamp;
         try {
             try (final KeyValueIterator<String, String> unused = 
metered.all()) {
+                oldestIteratorTimestampMetric = 
metric("oldest-iterator-open-since-ms");
+                assertThat(oldestIteratorTimestampMetric, not(nullValue()));
+
                 final long oldestTimestamp = mockTime.milliseconds();
                 assertThat((Long) oldestIteratorTimestampMetric.metricValue(), 
equalTo(oldestTimestamp));
                 mockTime.sleep(100);
@@ -553,7 +554,8 @@ public class MeteredKeyValueStoreTest {
             }
         }
 
-        assertThat((Integer) oldestIteratorTimestampMetric.metricValue(), 
nullValue());
+        oldestIteratorTimestampMetric = 
metric("oldest-iterator-open-since-ms");
+        assertThat(oldestIteratorTimestampMetric, nullValue());
     }
 
     private KafkaMetric metric(final MetricName metricName) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index fa42cb07283..2e3c470387c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -503,15 +503,16 @@ public class MeteredTimestampedKeyValueStoreTest {
         when(inner.all()).thenReturn(KeyValueIterators.emptyIterator());
         init();
 
-        final KafkaMetric oldestIteratorTimestampMetric = 
metric("oldest-iterator-open-since-ms");
-        assertThat(oldestIteratorTimestampMetric, not(nullValue()));
-
-        assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue());
+        KafkaMetric oldestIteratorTimestampMetric = 
metric("oldest-iterator-open-since-ms");
+        assertThat(oldestIteratorTimestampMetric, nullValue());
 
         KeyValueIterator<String, ValueAndTimestamp<String>> second = null;
         final long secondTimestamp;
         try {
             try (final KeyValueIterator<String, ValueAndTimestamp<String>> 
unused = metered.all()) {
+                oldestIteratorTimestampMetric = 
metric("oldest-iterator-open-since-ms");
+                assertThat(oldestIteratorTimestampMetric, not(nullValue()));
+
                 final long oldestTimestamp = mockTime.milliseconds();
                 assertThat((Long) oldestIteratorTimestampMetric.metricValue(), 
equalTo(oldestTimestamp));
                 mockTime.sleep(100);
@@ -531,6 +532,7 @@ public class MeteredTimestampedKeyValueStoreTest {
             }
         }
 
-        assertThat((Integer) oldestIteratorTimestampMetric.metricValue(), 
nullValue());
+        oldestIteratorTimestampMetric = 
metric("oldest-iterator-open-since-ms");
+        assertThat(oldestIteratorTimestampMetric, nullValue());
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
index f0a7f23c09c..8e8e02b2722 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
@@ -426,16 +426,17 @@ public class MeteredVersionedKeyValueStoreTest {
         when(inner.query(any(), any(), any())).thenReturn(
                 QueryResult.forResult(new 
LogicalSegmentIterator(Collections.emptyListIterator(), RAW_KEY, 0L, 0L, 
ResultOrder.ANY)));
 
-        final KafkaMetric oldestIteratorTimestampMetric = 
getMetric("oldest-iterator-open-since-ms");
-        assertThat(oldestIteratorTimestampMetric, not(nullValue()));
-
-        assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue());
+        KafkaMetric oldestIteratorTimestampMetric = 
getMetric("oldest-iterator-open-since-ms");
+        assertThat(oldestIteratorTimestampMetric, nullValue());
 
         final QueryResult<VersionedRecordIterator<String>> first = 
store.query(query, bound, config);
         VersionedRecordIterator<String> secondIterator = null;
         final long secondTime;
         try {
             try (final VersionedRecordIterator<String> unused = 
first.getResult()) {
+                oldestIteratorTimestampMetric = 
getMetric("oldest-iterator-open-since-ms");
+                assertThat(oldestIteratorTimestampMetric, not(nullValue()));
+
                 final long oldestTimestamp = mockTime.milliseconds();
                 assertThat((Long) oldestIteratorTimestampMetric.metricValue(), 
equalTo(oldestTimestamp));
                 mockTime.sleep(100);
@@ -457,7 +458,8 @@ public class MeteredVersionedKeyValueStoreTest {
             }
         }
 
-        assertThat((Integer) oldestIteratorTimestampMetric.metricValue(), 
nullValue());
+        oldestIteratorTimestampMetric = 
getMetric("oldest-iterator-open-since-ms");
+        assertThat(oldestIteratorTimestampMetric, nullValue());
     }
 
     private KafkaMetric getMetric(final String name) {

Reply via email to