mjsax commented on code in PR #21736:
URL: https://github.com/apache/kafka/pull/21736#discussion_r2934531784


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -343,12 +396,23 @@ public long startTimestamp() {
 
         @Override
         public boolean hasNext() {
-            return iter.hasNext();
+            return cachedNext != null || iter.hasNext();
         }
 
         @Override
         public KeyValue<K, V> next() {
+            if (cachedNext != null) {
+                final KeyValue<K, V> result = cachedNext;
+                cachedNext = null;
+                return result;
+            }
+
             final KeyValue<Bytes, byte[]> keyValue = iter.next();
+
+            if (keyValue == null) {

Review Comment:
   We don't have this guard elsewhere -- why are we adding it? I believe the 
contract of the `Iterator` interface is, that if `hasNext()` returns `false`, 
it's invalid to call `next()` and thus it's ok (and even "correct") to crash 
instead of returning `null` ?
   
   I just went back to `MeteredTimestampedWindowStoreWithHeaders` and it seems 
we also added this check there -- I think we should also remove it there (we 
can just do it on this PR)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -383,12 +447,83 @@ public void close() {
 
         @Override
         public K peekNextKey() {
-            return serdes.keyFrom(iter.peekNextKey().get(), new 
RecordHeaders());
+            if (cachedNext == null) {
+                cachedNext = next();
+            }
+            return cachedNext == null ? null : cachedNext.key;

Review Comment:
   Do we need `cachedNext == null` check? I think, we know that it's never 
`null` here, and if it would be `null`, it implies `hasNext()` did return 
`false`, and it's a user error to call `peekNextKey` and we should rather crash 
to surface the bug in the user code?
   
   We should also update `MeteredTimestampedWindowStoreWithHeaders` accordingly



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -383,12 +447,83 @@ public void close() {
 
         @Override
         public K peekNextKey() {
-            return serdes.keyFrom(iter.peekNextKey().get(), new 
RecordHeaders());
+            if (cachedNext == null) {
+                cachedNext = next();
+            }
+            return cachedNext == null ? null : cachedNext.key;
+        }
+    }
+
+    private class MeteredValueTimestampHeadersIterator implements 
KeyValueIterator<K, ValueTimestampHeaders<V>>, MeteredIterator {
+        private final KeyValueIterator<Bytes, byte[]> iter;
+        private final Sensor sensor;
+        private final long startNs;
+        private final long startTimestampMs;
+        private KeyValue<K, ValueTimestampHeaders<V>> cachedNext;
+
+        private MeteredValueTimestampHeadersIterator(final 
KeyValueIterator<Bytes, byte[]> iter,
+                                                     final Sensor sensor) {
+            this.iter = iter;
+            this.sensor = sensor;
+            this.startNs = time.nanoseconds();
+            this.startTimestampMs = time.milliseconds();
+            numOpenIterators.increment();
+            openIterators.add(this);
+        }
+
+        @Override
+        public long startTimestamp() {
+            return startTimestampMs;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return cachedNext != null || iter.hasNext();
+        }
+
+        @Override
+        public KeyValue<K, ValueTimestampHeaders<V>> next() {
+            if (cachedNext != null) {
+                final KeyValue<K, ValueTimestampHeaders<V>> result = 
cachedNext;
+                cachedNext = null;
+                return result;
+            }
+
+            final KeyValue<Bytes, byte[]> keyValue = iter.next();
+
+            if (keyValue == null) {
+                return null;
+            }
+
+            final ValueTimestampHeaders<V> valueTimestampHeaders = 
serdes.valueFrom(keyValue.value, new RecordHeaders());
+            final Headers headers = valueTimestampHeaders != null ? 
valueTimestampHeaders.headers() : new RecordHeaders();
+            final K key = serdes.keyFrom(keyValue.key.get(), headers);
+            return KeyValue.pair(key, valueTimestampHeaders);
+        }
+
+        @Override
+        public void close() {
+            try {
+                iter.close();
+            } finally {
+                final long duration = time.nanoseconds() - startNs;
+                sensor.record(duration);
+                iteratorDurationSensor.record(duration);
+                numOpenIterators.decrement();
+                openIterators.remove(this);
+            }
+        }
+
+        @Override
+        public K peekNextKey() {
+            if (cachedNext == null) {
+                cachedNext = next();
+            }
+            return cachedNext == null ? null : cachedNext.key;

Review Comment:
   same



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java:
##########
@@ -497,4 +498,259 @@ private KafkaMetric metric(final String name) {
     private KafkaMetric metric(final MetricName metricName) {
         return this.metrics.metric(metricName);
     }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldUseHeadersFromValueToDeserializeKeyInRange() {
+        setUp();
+
+        final Serde<String> keySerde = mock(Serde.class);
+        final Serializer<String> keySerializer = mock(Serializer.class);
+        final Deserializer<String> keyDeserializer = mock(Deserializer.class);
+        final Serde<ValueTimestampHeaders<String>> valueSerde = 
mock(Serde.class);
+        final Deserializer<ValueTimestampHeaders<String>> valueDeserializer = 
mock(Deserializer.class);
+
+        lenient().when(keySerde.deserializer()).thenReturn(keyDeserializer);
+        lenient().when(keySerde.serializer()).thenReturn(keySerializer);
+        
lenient().when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+
+        lenient().when(keySerializer.serialize(any(), 
any(RecordHeaders.class), any())).thenReturn(KEY.getBytes());
+
+        lenient().when(valueDeserializer.deserialize(any(), 
any(RecordHeaders.class), eq(VALUE_TIMESTAMP_HEADERS_BYTES)))
+            .thenReturn(VALUE_TIMESTAMP_HEADERS);
+
+        lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS), 
eq(KEY.getBytes())))
+            .thenReturn(KEY);
+
+        final KeyValue<Bytes, byte[]> testData = KeyValue.pair(KEY_BYTES, 
VALUE_TIMESTAMP_HEADERS_BYTES);
+
+        when(inner.range(any(Bytes.class), any(Bytes.class)))
+            .thenReturn(new 
KeyValueIteratorStub<>(List.of(testData).iterator()));
+
+        metered = new MeteredTimestampedKeyValueStoreWithHeaders<>(
+            inner,
+            STORE_TYPE,
+            new MockTime(),
+            keySerde,
+            valueSerde
+        );
+        metered.init(context, metered);
+
+        final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator 
= metered.range("a", "z");
+
+        assertTrue(iterator.hasNext());
+        final KeyValue<String, ValueTimestampHeaders<String>> result = 
iterator.next();

Review Comment:
   Should we insert a `assertEquals(KEY, iterator.peekNextKey());` before this 
call?
   
   If yes, should add this to all test in the PR, and also add to corresponding 
exiting tests for window-header store case?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -383,12 +447,83 @@ public void close() {
 
         @Override
         public K peekNextKey() {
-            return serdes.keyFrom(iter.peekNextKey().get(), new 
RecordHeaders());
+            if (cachedNext == null) {
+                cachedNext = next();
+            }
+            return cachedNext == null ? null : cachedNext.key;
+        }
+    }
+
+    private class MeteredValueTimestampHeadersIterator implements 
KeyValueIterator<K, ValueTimestampHeaders<V>>, MeteredIterator {
+        private final KeyValueIterator<Bytes, byte[]> iter;
+        private final Sensor sensor;
+        private final long startNs;
+        private final long startTimestampMs;
+        private KeyValue<K, ValueTimestampHeaders<V>> cachedNext;
+
+        private MeteredValueTimestampHeadersIterator(final 
KeyValueIterator<Bytes, byte[]> iter,
+                                                     final Sensor sensor) {
+            this.iter = iter;
+            this.sensor = sensor;
+            this.startNs = time.nanoseconds();
+            this.startTimestampMs = time.milliseconds();
+            numOpenIterators.increment();
+            openIterators.add(this);
+        }
+
+        @Override
+        public long startTimestamp() {
+            return startTimestampMs;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return cachedNext != null || iter.hasNext();
+        }
+
+        @Override
+        public KeyValue<K, ValueTimestampHeaders<V>> next() {
+            if (cachedNext != null) {
+                final KeyValue<K, ValueTimestampHeaders<V>> result = 
cachedNext;
+                cachedNext = null;
+                return result;
+            }
+
+            final KeyValue<Bytes, byte[]> keyValue = iter.next();
+
+            if (keyValue == null) {

Review Comment:
   Same



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to