This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new 2eae1fb2326 KAFKA-19678: Refactor to reset timestamp to zero when no 
open iterators (#21091)
2eae1fb2326 is described below

commit 2eae1fb232651cae39f7eb2b76836fd7f7759c9e
Author: Bill Bejeck <[email protected]>
AuthorDate: Wed Dec 10 10:25:51 2025 -0500

    KAFKA-19678: Refactor to reset timestamp to zero when no open iterators 
(#21091)
    
    This PR changes the approach for handling the case when there are no
    open iterators for the `oldest-iterator-open-since-ms` metric.
    Previously, we would register the metric when the store opens its first
    iterator, then deregister the metric when it closes the last remaining
    open iterator.  Under heavy load this process can cause performance
    degradation as detailed in the Jira ticket
    https://issues.apache.org/jira/browse/KAFKA-19678.  This PR updates the
    handling of the metric to register and deregister on store opening and
    closing respectively, and handle the case of current open iterators by
    returning a `0` to indicate this state.
    
    Reviewers: Matthias Sax <[email protected]>, Ramesh Sinha
---
 .../streams/internals/metrics/OpenIterators.java   | 86 ----------------------
 .../state/internals/MeteredKeyValueStore.java      | 27 ++++++-
 .../MeteredMultiVersionedKeyQueryIterator.java     | 12 ++-
 .../internals/MeteredVersionedKeyValueStore.java   |  1 +
 .../state/internals/metrics/StateStoreMetrics.java |  2 +-
 .../internals/metrics/OpenIteratorsTest.java       | 73 ------------------
 .../state/internals/MeteredKeyValueStoreTest.java  | 12 +--
 .../MeteredTimestampedKeyValueStoreTest.java       | 11 +--
 .../MeteredVersionedKeyValueStoreTest.java         | 11 +--
 9 files changed, 46 insertions(+), 189 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java
 
b/streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java
deleted file mode 100644
index bdeed2f8978..00000000000
--- 
a/streams/src/main/java/org/apache/kafka/streams/internals/metrics/OpenIterators.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.internals.metrics;
-
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.internals.MeteredIterator;
-import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
-
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.NavigableSet;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.LongAdder;
-
-public class OpenIterators {
-    private final TaskId taskId;
-    private final String metricsScope;
-    private final String name;
-    private final StreamsMetricsImpl streamsMetrics;
-
-    private final LongAdder numOpenIterators = new LongAdder();
-    private final NavigableSet<MeteredIterator> openIterators = new 
ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
-    private final AtomicLong oldestStartTimestamp = new AtomicLong();
-
-    private MetricName metricName;
-
-    public OpenIterators(final TaskId taskId,
-                         final String metricsScope,
-                         final String name,
-                         final StreamsMetricsImpl streamsMetrics) {
-        this.taskId = taskId;
-        this.metricsScope = metricsScope;
-        this.name = name;
-        this.streamsMetrics = streamsMetrics;
-    }
-
-    public void add(final MeteredIterator iterator) {
-        openIterators.add(iterator);
-        numOpenIterators.increment();
-        updateOldestStartTimestamp();
-
-        if (numOpenIterators.intValue() == 1) {
-            metricName = 
StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), metricsScope, 
name, streamsMetrics,
-                (config, now) -> oldestStartTimestamp.get()
-            );
-        }
-    }
-
-    public void remove(final MeteredIterator iterator) {
-        if (numOpenIterators.intValue() == 1) {
-            streamsMetrics.removeMetric(metricName);
-            streamsMetrics.removeStoreLevelMetric(metricName);
-        }
-        numOpenIterators.decrement();
-        openIterators.remove(iterator);
-        updateOldestStartTimestamp();
-    }
-
-    public long sum() {
-        return numOpenIterators.sum();
-    }
-
-    private void updateOldestStartTimestamp() {
-        final Iterator<MeteredIterator> openIteratorsIterator = 
openIterators.iterator();
-        if (openIteratorsIterator.hasNext()) {
-            
oldestStartTimestamp.set(openIteratorsIterator.next().startTimestamp());
-        }
-    }
-}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 32d1ee91437..c2268f3269a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -23,7 +23,6 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.ProcessorStateException;
-import org.apache.kafka.streams.internals.metrics.OpenIterators;
 import org.apache.kafka.streams.kstream.internals.Change;
 import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils;
 import org.apache.kafka.streams.processor.StateStore;
@@ -49,9 +48,15 @@ import 
org.apache.kafka.streams.state.internals.StoreQueryUtils.QueryHandler;
 import org.apache.kafka.streams.state.internals.metrics.StateStoreMetrics;
 
 import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableSet;
+import java.util.NoSuchElementException;
 import java.util.Objects;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.function.Function;
 
 import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -92,7 +97,9 @@ public class MeteredKeyValueStore<K, V>
     private StreamsMetricsImpl streamsMetrics;
     private TaskId taskId;
 
-    protected OpenIterators openIterators;
+    protected LongAdder numOpenIterators = new LongAdder();
+    protected NavigableSet<MeteredIterator> openIterators = new 
ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
+
 
     @SuppressWarnings("rawtypes")
     private final Map<Class, QueryHandler> queryHandlers =
@@ -148,8 +155,16 @@ public class MeteredKeyValueStore<K, V>
         e2eLatencySensor = 
StateStoreMetrics.e2ELatencySensor(taskId.toString(), metricsScope, name(), 
streamsMetrics);
         iteratorDurationSensor = 
StateStoreMetrics.iteratorDurationSensor(taskId.toString(), metricsScope, 
name(), streamsMetrics);
         StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
-                (config, now) -> openIterators.sum());
-        openIterators = new OpenIterators(taskId, metricsScope, name(), 
streamsMetrics);
+                (config, now) -> numOpenIterators.sum());
+        StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
+            (config, now) -> {
+                try {
+                    final Iterator<MeteredIterator> iter = 
openIterators.iterator();
+                    return iter.hasNext() ? iter.next().startTimestamp() : 0L;
+                } catch (final NoSuchElementException e) {
+                    return 0L;
+                }
+            });
     }
 
     protected Serde<V> prepareValueSerdeForStore(final Serde<V> valueSerde, 
final SerdeGetter getter) {
@@ -435,6 +450,7 @@ public class MeteredKeyValueStore<K, V>
             this.sensor = sensor;
             this.startTimestamp = time.milliseconds();
             this.startNs = time.nanoseconds();
+            numOpenIterators.increment();
             openIterators.add(this);
         }
 
@@ -464,6 +480,7 @@ public class MeteredKeyValueStore<K, V>
                 final long duration = time.nanoseconds() - startNs;
                 sensor.record(duration);
                 iteratorDurationSensor.record(duration);
+                numOpenIterators.decrement();
                 openIterators.remove(this);
             }
         }
@@ -492,6 +509,7 @@ public class MeteredKeyValueStore<K, V>
             this.valueDeserializer = valueDeserializer;
             this.startTimestamp = time.milliseconds();
             this.startNs = time.nanoseconds();
+            numOpenIterators.increment();
             openIterators.add(this);
         }
 
@@ -521,6 +539,7 @@ public class MeteredKeyValueStore<K, V>
                 final long duration = time.nanoseconds() - startNs;
                 sensor.record(duration);
                 iteratorDurationSensor.record(duration);
+                numOpenIterators.decrement();
                 openIterators.remove(this);
             }
         }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java
index b27e6a78d86..b6787a311e7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredMultiVersionedKeyQueryIterator.java
@@ -18,10 +18,11 @@ package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.streams.internals.metrics.OpenIterators;
 import org.apache.kafka.streams.state.VersionedRecord;
 import org.apache.kafka.streams.state.VersionedRecordIterator;
 
+import java.util.Set;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.function.Function;
 
 class MeteredMultiVersionedKeyQueryIterator<V> implements 
VersionedRecordIterator<V>, MeteredIterator {
@@ -32,20 +33,24 @@ class MeteredMultiVersionedKeyQueryIterator<V> implements 
VersionedRecordIterato
     private final Time time;
     private final long startNs;
     private final long startTimestampMs;
-    private final OpenIterators openIterators;
+    private final Set<MeteredIterator> openIterators;
+    private final LongAdder numOpenIterators;
 
     public MeteredMultiVersionedKeyQueryIterator(final 
VersionedRecordIterator<byte[]> iterator,
                                                  final Sensor sensor,
                                                  final Time time,
                                                  final 
Function<VersionedRecord<byte[]>, VersionedRecord<V>> deserializeValue,
-                                                 final OpenIterators 
openIterators) {
+                                                 final LongAdder 
numOpenIterators,
+                                                 final Set<MeteredIterator> 
openIterators) {
         this.iterator = iterator;
         this.deserializeValue = deserializeValue;
+        this.numOpenIterators = numOpenIterators;
         this.openIterators = openIterators;
         this.sensor = sensor;
         this.time = time;
         this.startNs = time.nanoseconds();
         this.startTimestampMs = time.milliseconds();
+        numOpenIterators.increment();
         openIterators.add(this);
     }
 
@@ -60,6 +65,7 @@ class MeteredMultiVersionedKeyQueryIterator<V> implements 
VersionedRecordIterato
             iterator.close();
         } finally {
             sensor.record(time.nanoseconds() - startNs);
+            numOpenIterators.decrement();
             openIterators.remove(this);
         }
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
index 6afb4d1531d..66eb3206def 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStore.java
@@ -269,6 +269,7 @@ public class MeteredVersionedKeyValueStore<K, V>
                             iteratorDurationSensor,
                             time,
                             StoreQueryUtils.deserializeValue(plainValueSerdes),
+                            numOpenIterators,
                             openIterators
                         );
                 final QueryResult<MeteredMultiVersionedKeyQueryIterator<V>> 
typedQueryResult =
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
index bb60c304684..cfaece063e4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
@@ -467,7 +467,7 @@ public class StateStoreMetrics {
                 storeName,
                 OLDEST_ITERATOR_OPEN_SINCE_MS,
                 OLDEST_ITERATOR_OPEN_SINCE_MS_DESCRIPTION,
-                RecordingLevel.INFO,
+                RecordingLevel.DEBUG,
                 oldestOpenIteratorGauge
         );
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/OpenIteratorsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/internals/metrics/OpenIteratorsTest.java
deleted file mode 100644
index daaacb7bec6..00000000000
--- 
a/streams/src/test/java/org/apache/kafka/streams/internals/metrics/OpenIteratorsTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.internals.metrics;
-
-import org.apache.kafka.common.metrics.Gauge;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
-import org.apache.kafka.streams.state.internals.MeteredIterator;
-
-import org.junit.jupiter.api.Test;
-import org.mockito.ArgumentCaptor;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.not;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.verify;
-
-public class OpenIteratorsTest {
-
-    private final StreamsMetricsImpl streamsMetrics = 
mock(StreamsMetricsImpl.class);
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void shouldCalculateOldestStartTimestampCorrectly() {
-        final OpenIterators openIterators = new OpenIterators(new TaskId(0, 
0), "scope", "name", streamsMetrics);
-
-        final MeteredIterator meteredIterator1 = () -> 5;
-        final MeteredIterator meteredIterator2 = () -> 2;
-        final MeteredIterator meteredIterator3 = () -> 6;
-
-        openIterators.add(meteredIterator1);
-        final ArgumentCaptor<Gauge<Long>> gaugeCaptor = 
ArgumentCaptor.forClass(Gauge.class);
-        verify(streamsMetrics).addStoreLevelMutableMetric(any(), any(), any(), 
any(), any(), any(), gaugeCaptor.capture());
-        final Gauge<Long> gauge = gaugeCaptor.getValue();
-        assertThat(gauge.value(null, 0), is(5L));
-        reset(streamsMetrics);
-
-        openIterators.add(meteredIterator2);
-        verify(streamsMetrics, never()).addStoreLevelMutableMetric(any(), 
any(), any(), any(), any(), any(), gaugeCaptor.capture());
-        assertThat(gauge.value(null, 0), is(2L));
-
-        openIterators.remove(meteredIterator2);
-        verify(streamsMetrics, never()).removeStoreLevelMetric(any());
-        assertThat(gauge.value(null, 0), is(5L));
-
-        openIterators.remove(meteredIterator1);
-        verify(streamsMetrics).removeStoreLevelMetric(any());
-        assertThat(gauge.value(null, 0), is(5L));
-
-        openIterators.add(meteredIterator3);
-        verify(streamsMetrics).addStoreLevelMutableMetric(any(), any(), any(), 
any(), any(), any(), gaugeCaptor.capture());
-        assertThat(gaugeCaptor.getValue(), not(gauge));
-        assertThat(gaugeCaptor.getValue().value(null, 0), is(6L));
-    }
-}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 1a85901ccc4..1a6560f5f40 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -525,16 +525,13 @@ public class MeteredKeyValueStoreTest {
         when(inner.all()).thenReturn(KeyValueIterators.emptyIterator());
         init();
 
-        KafkaMetric oldestIteratorTimestampMetric = 
metric("oldest-iterator-open-since-ms");
-        assertThat(oldestIteratorTimestampMetric, nullValue());
+        final KafkaMetric oldestIteratorTimestampMetric = 
metric("oldest-iterator-open-since-ms");
+        assertThat(oldestIteratorTimestampMetric, not(nullValue()));
 
         KeyValueIterator<String, String> second = null;
         final long secondTimestamp;
         try {
             try (final KeyValueIterator<String, String> unused = 
metered.all()) {
-                oldestIteratorTimestampMetric = 
metric("oldest-iterator-open-since-ms");
-                assertThat(oldestIteratorTimestampMetric, not(nullValue()));
-
                 final long oldestTimestamp = mockTime.milliseconds();
                 assertThat((Long) oldestIteratorTimestampMetric.metricValue(), 
equalTo(oldestTimestamp));
                 mockTime.sleep(100);
@@ -553,9 +550,8 @@ public class MeteredKeyValueStoreTest {
                 second.close();
             }
         }
-
-        oldestIteratorTimestampMetric = 
metric("oldest-iterator-open-since-ms");
-        assertThat(oldestIteratorTimestampMetric, nullValue());
+        // no open iterators left, timestamp should be reset to 0
+        assertThat((Long) oldestIteratorTimestampMetric.metricValue(), 
equalTo(0L));
     }
 
     private KafkaMetric metric(final MetricName metricName) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
index 9b5d33db966..0f3f303c8a7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreTest.java
@@ -503,15 +503,13 @@ public class MeteredTimestampedKeyValueStoreTest {
         when(inner.all()).thenReturn(KeyValueIterators.emptyIterator());
         init();
 
-        KafkaMetric oldestIteratorTimestampMetric = 
metric("oldest-iterator-open-since-ms");
-        assertThat(oldestIteratorTimestampMetric, nullValue());
+        final KafkaMetric oldestIteratorTimestampMetric = 
metric("oldest-iterator-open-since-ms");
+        assertThat(oldestIteratorTimestampMetric, not(nullValue()));
 
         KeyValueIterator<String, ValueAndTimestamp<String>> second = null;
         final long secondTimestamp;
         try {
             try (final KeyValueIterator<String, ValueAndTimestamp<String>> 
unused = metered.all()) {
-                oldestIteratorTimestampMetric = 
metric("oldest-iterator-open-since-ms");
-                assertThat(oldestIteratorTimestampMetric, not(nullValue()));
 
                 final long oldestTimestamp = mockTime.milliseconds();
                 assertThat((Long) oldestIteratorTimestampMetric.metricValue(), 
equalTo(oldestTimestamp));
@@ -531,8 +529,7 @@ public class MeteredTimestampedKeyValueStoreTest {
                 second.close();
             }
         }
-
-        oldestIteratorTimestampMetric = 
metric("oldest-iterator-open-since-ms");
-        assertThat(oldestIteratorTimestampMetric, nullValue());
+        // now that all iterators are closed, the metric should be zero
+        assertThat((Long) oldestIteratorTimestampMetric.metricValue(), 
equalTo(0L));
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
index d40f6947480..f3676377c44 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredVersionedKeyValueStoreTest.java
@@ -426,16 +426,14 @@ public class MeteredVersionedKeyValueStoreTest {
         when(inner.query(any(), any(), any())).thenReturn(
                 QueryResult.forResult(new 
LogicalSegmentIterator(Collections.emptyListIterator(), RAW_KEY, 0L, 0L, 
ResultOrder.ANY)));
 
-        KafkaMetric oldestIteratorTimestampMetric = 
getMetric("oldest-iterator-open-since-ms");
-        assertThat(oldestIteratorTimestampMetric, nullValue());
+        final KafkaMetric oldestIteratorTimestampMetric = 
getMetric("oldest-iterator-open-since-ms");
+        assertThat(oldestIteratorTimestampMetric, not(nullValue()));
 
         final QueryResult<VersionedRecordIterator<String>> first = 
store.query(query, bound, config);
         VersionedRecordIterator<String> secondIterator = null;
         final long secondTime;
         try {
             try (final VersionedRecordIterator<String> unused = 
first.getResult()) {
-                oldestIteratorTimestampMetric = 
getMetric("oldest-iterator-open-since-ms");
-                assertThat(oldestIteratorTimestampMetric, not(nullValue()));
 
                 final long oldestTimestamp = mockTime.milliseconds();
                 assertThat((Long) oldestIteratorTimestampMetric.metricValue(), 
equalTo(oldestTimestamp));
@@ -457,9 +455,8 @@ public class MeteredVersionedKeyValueStoreTest {
                 secondIterator.close();
             }
         }
-
-        oldestIteratorTimestampMetric = 
getMetric("oldest-iterator-open-since-ms");
-        assertThat(oldestIteratorTimestampMetric, nullValue());
+        // no open iterators left, timestamp should be reset to 0
+        assertThat((Long) oldestIteratorTimestampMetric.metricValue(), 
equalTo(0L));
     }
 
     private KafkaMetric getMetric(final String name) {

Reply via email to