mjsax commented on code in PR #21736:
URL: https://github.com/apache/kafka/pull/21736#discussion_r2934531784
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -343,12 +396,23 @@ public long startTimestamp() {
@Override
public boolean hasNext() {
- return iter.hasNext();
+ return cachedNext != null || iter.hasNext();
}
@Override
public KeyValue<K, V> next() {
+ if (cachedNext != null) {
+ final KeyValue<K, V> result = cachedNext;
+ cachedNext = null;
+ return result;
+ }
+
final KeyValue<Bytes, byte[]> keyValue = iter.next();
+
+ if (keyValue == null) {
Review Comment:
We don't have this guard elsewhere -- why are we adding it? I believe the
contract of the `Iterator` interface is, that if `hasNext()` returns `false`,
it's invalid to call `next()` and thus it's ok (and even "correct") to crash
instead of returning `null` ?
I just went back to `MeteredTimestampedWindowStoreWithHeaders` and it seems
we also added this check there -- I think we should also remove it there (we
can just do it on this PR)
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -383,12 +447,83 @@ public void close() {
@Override
public K peekNextKey() {
- return serdes.keyFrom(iter.peekNextKey().get(), new
RecordHeaders());
+ if (cachedNext == null) {
+ cachedNext = next();
+ }
+ return cachedNext == null ? null : cachedNext.key;
Review Comment:
Do we need `cachedNext == null` check? I think, we know that it's never
`null` here, and if it would be `null`, it implies `hasNext()` did return
`false`, and it's a user error to call `peekNextKey` and we should rather crash
to surface the bug in the user code?
We should also update `MeteredTimestampedWindowStoreWithHeaders` accordingly
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -383,12 +447,83 @@ public void close() {
@Override
public K peekNextKey() {
- return serdes.keyFrom(iter.peekNextKey().get(), new
RecordHeaders());
+ if (cachedNext == null) {
+ cachedNext = next();
+ }
+ return cachedNext == null ? null : cachedNext.key;
+ }
+ }
+
+ private class MeteredValueTimestampHeadersIterator implements
KeyValueIterator<K, ValueTimestampHeaders<V>>, MeteredIterator {
+ private final KeyValueIterator<Bytes, byte[]> iter;
+ private final Sensor sensor;
+ private final long startNs;
+ private final long startTimestampMs;
+ private KeyValue<K, ValueTimestampHeaders<V>> cachedNext;
+
+ private MeteredValueTimestampHeadersIterator(final
KeyValueIterator<Bytes, byte[]> iter,
+ final Sensor sensor) {
+ this.iter = iter;
+ this.sensor = sensor;
+ this.startNs = time.nanoseconds();
+ this.startTimestampMs = time.milliseconds();
+ numOpenIterators.increment();
+ openIterators.add(this);
+ }
+
+ @Override
+ public long startTimestamp() {
+ return startTimestampMs;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return cachedNext != null || iter.hasNext();
+ }
+
+ @Override
+ public KeyValue<K, ValueTimestampHeaders<V>> next() {
+ if (cachedNext != null) {
+ final KeyValue<K, ValueTimestampHeaders<V>> result =
cachedNext;
+ cachedNext = null;
+ return result;
+ }
+
+ final KeyValue<Bytes, byte[]> keyValue = iter.next();
+
+ if (keyValue == null) {
+ return null;
+ }
+
+ final ValueTimestampHeaders<V> valueTimestampHeaders =
serdes.valueFrom(keyValue.value, new RecordHeaders());
+ final Headers headers = valueTimestampHeaders != null ?
valueTimestampHeaders.headers() : new RecordHeaders();
+ final K key = serdes.keyFrom(keyValue.key.get(), headers);
+ return KeyValue.pair(key, valueTimestampHeaders);
+ }
+
+ @Override
+ public void close() {
+ try {
+ iter.close();
+ } finally {
+ final long duration = time.nanoseconds() - startNs;
+ sensor.record(duration);
+ iteratorDurationSensor.record(duration);
+ numOpenIterators.decrement();
+ openIterators.remove(this);
+ }
+ }
+
+ @Override
+ public K peekNextKey() {
+ if (cachedNext == null) {
+ cachedNext = next();
+ }
+ return cachedNext == null ? null : cachedNext.key;
Review Comment:
same
##########
streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeadersTest.java:
##########
@@ -497,4 +498,259 @@ private KafkaMetric metric(final String name) {
private KafkaMetric metric(final MetricName metricName) {
return this.metrics.metric(metricName);
}
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void shouldUseHeadersFromValueToDeserializeKeyInRange() {
+ setUp();
+
+ final Serde<String> keySerde = mock(Serde.class);
+ final Serializer<String> keySerializer = mock(Serializer.class);
+ final Deserializer<String> keyDeserializer = mock(Deserializer.class);
+ final Serde<ValueTimestampHeaders<String>> valueSerde =
mock(Serde.class);
+ final Deserializer<ValueTimestampHeaders<String>> valueDeserializer =
mock(Deserializer.class);
+
+ lenient().when(keySerde.deserializer()).thenReturn(keyDeserializer);
+ lenient().when(keySerde.serializer()).thenReturn(keySerializer);
+
lenient().when(valueSerde.deserializer()).thenReturn(valueDeserializer);
+
+ lenient().when(keySerializer.serialize(any(),
any(RecordHeaders.class), any())).thenReturn(KEY.getBytes());
+
+ lenient().when(valueDeserializer.deserialize(any(),
any(RecordHeaders.class), eq(VALUE_TIMESTAMP_HEADERS_BYTES)))
+ .thenReturn(VALUE_TIMESTAMP_HEADERS);
+
+ lenient().when(keyDeserializer.deserialize(any(), eq(HEADERS),
eq(KEY.getBytes())))
+ .thenReturn(KEY);
+
+ final KeyValue<Bytes, byte[]> testData = KeyValue.pair(KEY_BYTES,
VALUE_TIMESTAMP_HEADERS_BYTES);
+
+ when(inner.range(any(Bytes.class), any(Bytes.class)))
+ .thenReturn(new
KeyValueIteratorStub<>(List.of(testData).iterator()));
+
+ metered = new MeteredTimestampedKeyValueStoreWithHeaders<>(
+ inner,
+ STORE_TYPE,
+ new MockTime(),
+ keySerde,
+ valueSerde
+ );
+ metered.init(context, metered);
+
+ final KeyValueIterator<String, ValueTimestampHeaders<String>> iterator
= metered.range("a", "z");
+
+ assertTrue(iterator.hasNext());
+ final KeyValue<String, ValueTimestampHeaders<String>> result =
iterator.next();
Review Comment:
Should we insert a `assertEquals(KEY, iterator.peekNextKey());` before this
call?
If yes, should add this to all test in the PR, and also add to corresponding
exiting tests for window-header store case?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -383,12 +447,83 @@ public void close() {
@Override
public K peekNextKey() {
- return serdes.keyFrom(iter.peekNextKey().get(), new
RecordHeaders());
+ if (cachedNext == null) {
+ cachedNext = next();
+ }
+ return cachedNext == null ? null : cachedNext.key;
+ }
+ }
+
+ private class MeteredValueTimestampHeadersIterator implements
KeyValueIterator<K, ValueTimestampHeaders<V>>, MeteredIterator {
+ private final KeyValueIterator<Bytes, byte[]> iter;
+ private final Sensor sensor;
+ private final long startNs;
+ private final long startTimestampMs;
+ private KeyValue<K, ValueTimestampHeaders<V>> cachedNext;
+
+ private MeteredValueTimestampHeadersIterator(final
KeyValueIterator<Bytes, byte[]> iter,
+ final Sensor sensor) {
+ this.iter = iter;
+ this.sensor = sensor;
+ this.startNs = time.nanoseconds();
+ this.startTimestampMs = time.milliseconds();
+ numOpenIterators.increment();
+ openIterators.add(this);
+ }
+
+ @Override
+ public long startTimestamp() {
+ return startTimestampMs;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return cachedNext != null || iter.hasNext();
+ }
+
+ @Override
+ public KeyValue<K, ValueTimestampHeaders<V>> next() {
+ if (cachedNext != null) {
+ final KeyValue<K, ValueTimestampHeaders<V>> result =
cachedNext;
+ cachedNext = null;
+ return result;
+ }
+
+ final KeyValue<Bytes, byte[]> keyValue = iter.next();
+
+ if (keyValue == null) {
Review Comment:
Same
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]