This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new cb8607579be KAFKA-19959: Update session and window store with new 
default when no open iterators (#21120)
cb8607579be is described below

commit cb8607579be49f56ed4a8b917261f7c3d9398abf
Author: Bill Bejeck <[email protected]>
AuthorDate: Thu Dec 11 16:43:02 2025 -0500

    KAFKA-19959: Update session and window store with new default when no open 
iterators (#21120)
    
    When applying the NPE fix for the `oldest-iterator-open-since-ms` metric
    we only applied this to the `MeteredKeyValueStore`. This fix needs to be
    applied to `MeteredSessionStore` and `MeteredWindowStore` as well.
    
    Reviewers: Matthias Sax <[email protected]>
---
 .../kafka/streams/state/internals/MeteredSessionStore.java       | 7 ++++++-
 .../apache/kafka/streams/state/internals/MeteredWindowStore.java | 9 +++++++--
 .../kafka/streams/state/internals/MeteredKeyValueStoreTest.java  | 6 ++++--
 .../kafka/streams/state/internals/MeteredSessionStoreTest.java   | 6 +++---
 .../kafka/streams/state/internals/MeteredWindowStoreTest.java    | 6 +++---
 5 files changed, 23 insertions(+), 11 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 546959a9269..a34ccfa79cb 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -47,6 +47,7 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.atomic.LongAdder;
@@ -126,9 +127,13 @@ public class MeteredSessionStore<K, V>
                 (config, now) -> numOpenIterators.sum());
         StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
                 (config, now) -> {
+                try {
                     final Iterator<MeteredIterator> openIteratorsIterator = 
openIterators.iterator();
-                    return openIteratorsIterator.hasNext() ? 
openIteratorsIterator.next().startTimestamp() : null;
+                    return openIteratorsIterator.hasNext() ? 
openIteratorsIterator.next().startTimestamp() : 0L;
+                } catch (final NoSuchElementException e) {
+                    return 0L;
                 }
+            }
         );
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 783c16b2f4f..c43d5613c69 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -50,6 +50,7 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.atomic.LongAdder;
@@ -143,10 +144,14 @@ public class MeteredWindowStore<K, V>
         StateStoreMetrics.addNumOpenIteratorsGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
                 (config, now) -> numOpenIterators.sum());
         StateStoreMetrics.addOldestOpenIteratorGauge(taskId.toString(), 
metricsScope, name(), streamsMetrics,
-                (config, now) -> {
+            (config, now) -> {
+                try {
                     final Iterator<MeteredIterator> openIteratorsIterator = 
openIterators.iterator();
-                    return openIteratorsIterator.hasNext() ? 
openIteratorsIterator.next().startTimestamp() : null;
+                    return openIteratorsIterator.hasNext() ? 
openIteratorsIterator.next().startTimestamp() : 0L;
+                } catch (final NoSuchElementException e) {
+                    return 0L;
                 }
+            }
         );
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
index 1a6560f5f40..7e1c72fd7e4 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStoreTest.java
@@ -528,6 +528,8 @@ public class MeteredKeyValueStoreTest {
         final KafkaMetric oldestIteratorTimestampMetric = 
metric("oldest-iterator-open-since-ms");
         assertThat(oldestIteratorTimestampMetric, not(nullValue()));
 
+        assertThat(oldestIteratorTimestampMetric.metricValue(), equalTo(0L));
+
         KeyValueIterator<String, String> second = null;
         final long secondTimestamp;
         try {
@@ -544,14 +546,14 @@ public class MeteredKeyValueStoreTest {
             }
 
             // now that the first iterator is closed, check that the timestamp 
has advanced to the still open second iterator
-            assertThat((Long) oldestIteratorTimestampMetric.metricValue(), 
equalTo(secondTimestamp));
+            assertThat(oldestIteratorTimestampMetric.metricValue(), 
equalTo(secondTimestamp));
         } finally {
             if (second != null) {
                 second.close();
             }
         }
         // no open iterators left, timestamp should be reset to 0
-        assertThat((Long) oldestIteratorTimestampMetric.metricValue(), 
equalTo(0L));
+        assertThat(oldestIteratorTimestampMetric.metricValue(), equalTo(0L));
     }
 
     private KafkaMetric metric(final MetricName metricName) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
index e34c232075e..91755a1df34 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredSessionStoreTest.java
@@ -712,7 +712,7 @@ public class MeteredSessionStoreTest {
         final KafkaMetric oldestIteratorTimestampMetric = 
metric("oldest-iterator-open-since-ms");
         assertThat(oldestIteratorTimestampMetric, not(nullValue()));
 
-        assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue());
+        assertThat(oldestIteratorTimestampMetric.metricValue(), equalTo(0L));
 
         KeyValueIterator<Windowed<String>, String> second = null;
         final long secondTimestamp;
@@ -730,14 +730,14 @@ public class MeteredSessionStoreTest {
             }
 
             // now that the first iterator is closed, check that the timestamp 
has advanced to the still open second iterator
-            assertThat((Long) oldestIteratorTimestampMetric.metricValue(), 
equalTo(secondTimestamp));
+            assertThat(oldestIteratorTimestampMetric.metricValue(), 
equalTo(secondTimestamp));
         } finally {
             if (second != null) {
                 second.close();
             }
         }
 
-        assertThat((Integer) oldestIteratorTimestampMetric.metricValue(), 
nullValue());
+        assertThat(oldestIteratorTimestampMetric.metricValue(), equalTo(0L));
     }
 
     private KafkaMetric metric(final String name) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index d0efdb8f4b7..c87671fd373 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -494,7 +494,7 @@ public class MeteredWindowStoreTest {
         final KafkaMetric oldestIteratorTimestampMetric = 
metric("oldest-iterator-open-since-ms");
         assertThat(oldestIteratorTimestampMetric, not(nullValue()));
 
-        assertThat(oldestIteratorTimestampMetric.metricValue(), nullValue());
+        assertThat(oldestIteratorTimestampMetric.metricValue(), equalTo(0L));
 
         KeyValueIterator<Windowed<String>, String> second = null;
         final long secondTimestamp;
@@ -512,14 +512,14 @@ public class MeteredWindowStoreTest {
             }
 
             // now that the first iterator is closed, check that the timestamp 
has advanced to the still open second iterator
-            assertThat((Long) oldestIteratorTimestampMetric.metricValue(), 
equalTo(secondTimestamp));
+            assertThat(oldestIteratorTimestampMetric.metricValue(), 
equalTo(secondTimestamp));
         } finally {
             if (second != null) {
                 second.close();
             }
         }
 
-        assertThat((Integer) oldestIteratorTimestampMetric.metricValue(), 
nullValue());
+        assertThat(oldestIteratorTimestampMetric.metricValue(), equalTo(0L));
     }
 
     private KafkaMetric metric(final String name) {

Reply via email to