This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b2dd7df1591 KAFKA-20202: Verify existing IQv2 methods for
MeteredTimestampedWindowStoreWithHeaders (#21650)
b2dd7df1591 is described below
commit b2dd7df15913be5cd1e7b7b01512c8663335140a
Author: Alieh Saeedi <[email protected]>
AuthorDate: Mon Mar 9 18:40:41 2026 +0100
KAFKA-20202: Verify existing IQv2 methods for
MeteredTimestampedWindowStoreWithHeaders (#21650)
This PR is NOT about implementing IQv2s for header-store, but provides
IQv2s for headers store through the timestamped state stores. So the
results do not contain the headers even though the headers are preserved
in the headers state store.
Part of KIP-1271.
Reviewers: Matthias J. Sax <[email protected]>
---
.../MeteredTimestampedWindowStoreWithHeaders.java | 210 +++++++++++++
.../state/internals/MeteredWindowStore.java | 10 +-
.../RocksDBTimestampedWindowStoreWithHeaders.java | 26 ++
.../TimestampedToHeadersWindowStoreAdapter.java | 43 ++-
.../TimestampedWindowStoreWithHeadersBuilder.java | 11 -
.../state/internals/InMemoryWindowStoreTest.java | 226 ++++++++++++++
...cksDBTimestampedWindowStoreWithHeadersTest.java | 176 +++++++++++
...TimestampedToHeadersWindowStoreAdapterTest.java | 322 ++++++++++++++++++++
...mestampedWindowStoreWithHeadersBuilderTest.java | 325 +++++++++++----------
9 files changed, 1172 insertions(+), 177 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
index e982365da17..b950184fb26 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreWithHeaders.java
@@ -22,12 +22,26 @@ import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.WindowKeyQuery;
+import org.apache.kafka.streams.query.WindowRangeQuery;
+import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
import org.apache.kafka.streams.state.WindowStore;
+import org.apache.kafka.streams.state.WindowStoreIterator;
import java.util.Objects;
+import java.util.function.Function;
import static
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
@@ -83,4 +97,200 @@ class MeteredTimestampedWindowStoreWithHeaders<K, V>
protected Bytes keyBytes(final K key, final Headers headers) {
return Bytes.wrap(serdes.rawKey(key, headers));
}
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+ final long start = config.isCollectExecutionInfo() ? System.nanoTime()
: -1L;
+ final QueryResult<R> result;
+
+ if (query instanceof WindowKeyQuery) {
+ result = runWindowKeyQuery((WindowKeyQuery<K,
ValueTimestampHeaders<V>>) query, positionBound, config);
+ } else if (query instanceof WindowRangeQuery) {
+ result = runWindowRangeQuery((WindowRangeQuery<K,
ValueTimestampHeaders<V>>) query, positionBound, config);
+ } else {
+ result = wrapped().query(query, positionBound, config);
+ }
+
+ if (config.isCollectExecutionInfo()) {
+ final String conversionType = isUnderlyingStoreTimestamped()
+ ? "with conversion to ValueAndTimestamp"
+ : "with extraction of plain values";
+ result.addExecutionInfo(
+ "Handled in " + getClass() + " " + conversionType + " in "
+ + (time.nanoseconds() - start) + "ns");
+ }
+ return result;
+ }
+
+ /**
+ * Handles WindowKeyQuery by creating a MeteredWindowStoreIterator with
conversion from
+ * ValueTimestampHeaders to either ValueAndTimestamp<V> (for timestamped
stores) or V (for non-timestamped stores).
+ */
+ @SuppressWarnings("unchecked")
+ private <R> QueryResult<R> runWindowKeyQuery(final WindowKeyQuery<K,
ValueTimestampHeaders<V>> query,
+ final PositionBound
positionBound,
+ final QueryConfig config) {
+ final QueryResult<R> queryResult;
+ if (query.getTimeFrom().isPresent() && query.getTimeTo().isPresent()) {
+ final WindowKeyQuery<Bytes, byte[]> rawKeyQuery =
+ WindowKeyQuery.withKeyAndWindowStartRange(
+ keyBytes(query.getKey(), new RecordHeaders()),
+ query.getTimeFrom().get(),
+ query.getTimeTo().get()
+ );
+ final QueryResult<WindowStoreIterator<byte[]>> rawResult =
wrapped().query(rawKeyQuery, positionBound, config);
+ if (rawResult.isSuccess()) {
+ if (isUnderlyingStoreTimestamped()) {
+ // For timestamped stores, return ValueAndTimestamp<V>
+ final Function<byte[], ValueAndTimestamp<V>> valueFrom =
bytes -> {
+ final ValueTimestampHeaders<V> vth =
serdes.valueFrom(bytes, new RecordHeaders());
+ return vth == null ? null :
ValueAndTimestamp.make(vth.value(), vth.timestamp());
+ };
+
+ final MeteredWindowStoreIterator<ValueAndTimestamp<V>>
typedResult =
+ new MeteredWindowStoreIterator<>(
+ rawResult.getResult(),
+ fetchSensor,
+ iteratorDurationSensor,
+ streamsMetrics,
+ valueFrom,
+ time,
+ numOpenIterators,
+ openIterators
+ );
+ final
QueryResult<MeteredWindowStoreIterator<ValueAndTimestamp<V>>> typedQueryResult =
+
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
typedResult);
+ queryResult = (QueryResult<R>) typedQueryResult;
+ } else {
+ // For non-timestamped stores, return plain V
+ final Function<byte[], V> valueFrom = bytes -> {
+ final ValueTimestampHeaders<V> vth =
serdes.valueFrom(bytes, new RecordHeaders());
+ return vth == null ? null : vth.value();
+ };
+
+ final MeteredWindowStoreIterator<V> typedResult =
+ new MeteredWindowStoreIterator<>(
+ rawResult.getResult(),
+ fetchSensor,
+ iteratorDurationSensor,
+ streamsMetrics,
+ valueFrom,
+ time,
+ numOpenIterators,
+ openIterators
+ );
+ final QueryResult<MeteredWindowStoreIterator<V>>
typedQueryResult =
+
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
typedResult);
+ queryResult = (QueryResult<R>) typedQueryResult;
+ }
+ } else {
+ queryResult = (QueryResult<R>) rawResult;
+ }
+ } else {
+ queryResult = QueryResult.forFailure(
+ FailureReason.UNKNOWN_QUERY_TYPE,
+ "This store (" + getClass() + ") doesn't know how to
execute"
+ + " the given query (" + query + ") because it only
supports closed-range"
+ + " queries."
+ + " Contact the store maintainer if you need support
for a new query type."
+ );
+ }
+ return queryResult;
+ }
+
+
+ /**
+ * Handles WindowRangeQuery by creating a MeteredWindowedKeyValueIterator
with conversion from
+ * ValueTimestampHeaders to either ValueAndTimestamp<V> (for timestamped
stores) or V (for non-timestamped stores).
+ */
+ @SuppressWarnings("unchecked")
+ private <R> QueryResult<R> runWindowRangeQuery(final WindowRangeQuery<K,
ValueTimestampHeaders<V>> query,
+ final PositionBound
positionBound,
+ final QueryConfig config) {
+ final QueryResult<R> result;
+ if (query.getTimeFrom().isPresent() && query.getTimeTo().isPresent()) {
+ final WindowRangeQuery<Bytes, byte[]> rawKeyQuery =
+ WindowRangeQuery.withWindowStartRange(
+ query.getTimeFrom().get(),
+ query.getTimeTo().get()
+ );
+ final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>>
rawResult =
+ wrapped().query(rawKeyQuery, positionBound, config);
+ if (rawResult.isSuccess()) {
+ final Function<byte[], K> keyFrom = bytes ->
serdes.keyFrom(bytes, new RecordHeaders());
+
+ if (isUnderlyingStoreTimestamped()) {
+ // For timestamped stores, return ValueAndTimestamp<V>
+ final Function<byte[], ValueAndTimestamp<V>> valueFrom =
bytes -> {
+ final ValueTimestampHeaders<V> vth =
serdes.valueFrom(bytes, new RecordHeaders());
+ return vth == null ? null :
ValueAndTimestamp.make(vth.value(), vth.timestamp());
+ };
+
+ final MeteredWindowedKeyValueIterator<K,
ValueAndTimestamp<V>> typedResult =
+ new MeteredWindowedKeyValueIterator<>(
+ rawResult.getResult(),
+ fetchSensor,
+ iteratorDurationSensor,
+ streamsMetrics,
+ keyFrom,
+ valueFrom,
+ time,
+ numOpenIterators,
+ openIterators
+ );
+ final QueryResult<MeteredWindowedKeyValueIterator<K,
ValueAndTimestamp<V>>> typedQueryResult =
+
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
typedResult);
+ result = (QueryResult<R>) typedQueryResult;
+ } else {
+ // For non-timestamped stores, return plain V
+ final Function<byte[], V> valueFrom = bytes -> {
+ final ValueTimestampHeaders<V> vth =
serdes.valueFrom(bytes, new RecordHeaders());
+ return vth == null ? null : vth.value();
+ };
+
+ final MeteredWindowedKeyValueIterator<K, V> typedResult =
+ new MeteredWindowedKeyValueIterator<>(
+ rawResult.getResult(),
+ fetchSensor,
+ iteratorDurationSensor,
+ streamsMetrics,
+ keyFrom,
+ valueFrom,
+ time,
+ numOpenIterators,
+ openIterators
+ );
+ final QueryResult<MeteredWindowedKeyValueIterator<K, V>>
typedQueryResult =
+
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
typedResult);
+ result = (QueryResult<R>) typedQueryResult;
+ }
+ } else {
+ result = (QueryResult<R>) rawResult;
+ }
+ } else {
+ result = QueryResult.forFailure(
+ FailureReason.UNKNOWN_QUERY_TYPE,
+ "This store (" + getClass() + ") doesn't know how to"
+ + " execute the given query (" + query + ") because"
+ + " WindowStores only supports
WindowRangeQuery.withWindowStartRange."
+ + " Contact the store maintainer if you need support
for a new query type."
+ );
+ }
+ return result;
+ }
+
+ private boolean isUnderlyingStoreTimestamped() {
+ Object store = wrapped();
+ do {
+ if (store instanceof TimestampedBytesStore
+ || store instanceof
TimestampedToHeadersWindowStoreAdapter) {
+ return true;
+ }
+ store = ((WrappedStateStore<?, ?, ?>) store).wrapped();
+ } while ((store instanceof WrappedStateStore));
+ return store instanceof TimestampedBytesStore;
+ }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index b58cefc8dd3..db016071511 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -73,18 +73,18 @@ public class MeteredWindowStore<K, V>
private final Serde<K> keySerde;
private final Serde<V> valueSerde;
protected StateSerdes<K, V> serdes;
- private StreamsMetricsImpl streamsMetrics;
+ protected StreamsMetricsImpl streamsMetrics;
protected Sensor putSensor;
- private Sensor fetchSensor;
+ protected Sensor fetchSensor;
private Sensor flushSensor;
private Sensor e2eLatencySensor;
- private Sensor iteratorDurationSensor;
+ protected Sensor iteratorDurationSensor;
private InternalProcessorContext<?, ?> internalContext;
private TaskId taskId;
private Sensor restoreSensor;
- private final LongAdder numOpenIterators = new LongAdder();
- private final NavigableSet<MeteredIterator> openIterators = new
ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
+ protected final LongAdder numOpenIterators = new LongAdder();
+ protected final NavigableSet<MeteredIterator> openIterators = new
ConcurrentSkipListSet<>(Comparator.comparingLong(MeteredIterator::startTimestamp));
@SuppressWarnings("rawtypes")
private final Map<Class, QueryHandler> queryHandlers =
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedWindowStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedWindowStoreWithHeaders.java
index 5088a1ff207..37b1994b02f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedWindowStoreWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedWindowStoreWithHeaders.java
@@ -16,6 +16,11 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.state.HeadersBytesStore;
import org.apache.kafka.streams.state.TimestampedBytesStore;
@@ -47,4 +52,25 @@ class RocksDBTimestampedWindowStoreWithHeaders extends
RocksDBWindowStore implem
final long windowSize) {
super(bytesStore, retainDuplicates, windowSize);
}
+
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query,
+ final PositionBound positionBound,
+ final QueryConfig config) {
+ final long start = config.isCollectExecutionInfo() ? System.nanoTime()
: -1L;
+ final QueryResult<R> result;
+ final Position position = getPosition();
+
+ synchronized (position) {
+ result = QueryResult.forUnknownQueryType(query, this);
+
+ if (config.isCollectExecutionInfo()) {
+ result.addExecutionInfo(
+ "Handled in " + this.getClass() + " in " +
(System.nanoTime() - start) + "ns"
+ );
+ }
+ result.setPosition(position.copy());
+ }
+ return result;
+ }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java
index f53cbc68285..97f10285849 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapter.java
@@ -27,6 +27,9 @@ import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.WindowKeyQuery;
+import org.apache.kafka.streams.query.WindowRangeQuery;
+import org.apache.kafka.streams.query.internals.InternalQueryResultUtil;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
@@ -210,8 +213,46 @@ public class TimestampedToHeadersWindowStoreAdapter
implements WindowStore<Bytes
public <R> QueryResult<R> query(final Query<R> query,
final PositionBound positionBound,
final QueryConfig config) {
+ final long start = config.isCollectExecutionInfo() ? System.nanoTime()
: -1L;
+ final QueryResult<R> result;
+
+ // Handle WindowKeyQuery: wrap iterator to convert from timestamped to
headers format
+ if (query instanceof WindowKeyQuery) {
+ final WindowKeyQuery<Bytes, byte[]> windowKeyQuery =
(WindowKeyQuery<Bytes, byte[]>) query;
+ final QueryResult<WindowStoreIterator<byte[]>> rawResult =
store.query(windowKeyQuery, positionBound, config);
+
+ if (rawResult.isSuccess()) {
+ final WindowStoreIterator<byte[]> wrappedIterator =
+ new
TimestampedWindowToHeadersWindowStoreIteratorAdapter(rawResult.getResult());
+ result = (QueryResult<R>)
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
wrappedIterator);
+ } else {
+ result = (QueryResult<R>) rawResult;
+ }
+ } else if (query instanceof WindowRangeQuery) {
+ // Handle WindowRangeQuery: wrap iterator to convert values
+ final WindowRangeQuery<Bytes, byte[]> windowRangeQuery =
(WindowRangeQuery<Bytes, byte[]>) query;
+ final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>>
rawResult =
+ store.query(windowRangeQuery, positionBound, config);
+
+ if (rawResult.isSuccess()) {
+ final KeyValueIterator<Windowed<Bytes>, byte[]>
wrappedIterator =
+ new
TimestampedToHeadersIteratorAdapter<>(rawResult.getResult());
+ result = (QueryResult<R>)
InternalQueryResultUtil.copyAndSubstituteDeserializedResult(rawResult,
wrappedIterator);
+ } else {
+ result = (QueryResult<R>) rawResult;
+ }
+ } else {
+ // For other query types, delegate to the underlying store
+ result = store.query(query, positionBound, config);
+ }
+
+ if (config.isCollectExecutionInfo()) {
+ result.addExecutionInfo(
+ "Handled in " + getClass() + " in " + (System.nanoTime() -
start) + "ns"
+ );
+ }
- throw new UnsupportedOperationException("Queries (IQv2) are not
supported for timestamped window stores with headers yet.");
+ return result;
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
index 9fc363bb40e..c5957ca6748 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java
@@ -21,10 +21,6 @@ import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StateStore;
-import org.apache.kafka.streams.query.PositionBound;
-import org.apache.kafka.streams.query.Query;
-import org.apache.kafka.streams.query.QueryConfig;
-import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.state.HeadersBytesStore;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedBytesStore;
@@ -214,13 +210,6 @@ public class TimestampedWindowStoreWithHeadersBuilder<K, V>
return wrapped().backwardAll();
}
- @Override
- public <R> QueryResult<R> query(final Query<R> query,
- final PositionBound positionBound,
- final QueryConfig config) {
- throw new UnsupportedOperationException("Queries (IQv2) are not
supported for timestamped window stores with headers yet.");
- }
-
@Override
public boolean persistent() {
return false;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
index 73c0335cc35..b39449cca3a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/InMemoryWindowStoreTest.java
@@ -19,20 +19,39 @@ package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.WindowKeyQuery;
+import org.apache.kafka.streams.query.WindowRangeQuery;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
+import java.io.File;
+import java.time.Instant;
import java.util.LinkedList;
import java.util.List;
+import java.util.Properties;
import static java.time.Duration.ofMillis;
import static org.apache.kafka.common.utils.Utils.mkEntry;
@@ -40,6 +59,8 @@ import static org.apache.kafka.common.utils.Utils.mkMap;
import static
org.apache.kafka.streams.state.internals.WindowKeySchema.toStoreKeyBinary;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
public class InMemoryWindowStoreTest extends AbstractWindowBytesStoreTest {
@@ -185,4 +206,209 @@ public class InMemoryWindowStoreTest extends
AbstractWindowBytesStoreTest {
assertEquals(expected, actual);
}
+ @Nested
+ class InMemoryStoreIQv2Tests {
+ private static final long WINDOW_SIZE = 10_000L;
+ private static final long RETENTION_PERIOD = 60_000L;
+ private static final long SEGMENT_INTERVAL = 30_000L;
+
+ private InMemoryWindowStore inMemoryStore;
+ private InternalMockProcessorContext<String, String> context;
+ private File baseDir;
+
+ @BeforeEach
+ public void setUp() {
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ baseDir = TestUtils.tempDirectory();
+ context = new InternalMockProcessorContext<>(
+ baseDir,
+ Serdes.String(),
+ Serdes.String(),
+ new StreamsConfig(props)
+ );
+
+ inMemoryStore = new InMemoryWindowStore(
+ "iqv2-inmemory-test-store",
+ RETENTION_PERIOD,
+ WINDOW_SIZE,
+ false, // retainDuplicates
+ "test-metrics-scope"
+ );
+
+ inMemoryStore.init(context, inMemoryStore);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (inMemoryStore != null) {
+ inMemoryStore.close();
+ }
+ }
+
+ @Test
+ public void shouldHandleWindowKeyQuerySuccessfullyOnInMemoryStore() {
+ // Build a typed window store using timestamped window store with
headers
+ final TimestampedWindowStoreWithHeaders<String, String> typedStore
= Stores.timestampedWindowStoreWithHeadersBuilder(
+ Stores.inMemoryWindowStore(
+ "typed-window-store",
+ ofMillis(RETENTION_PERIOD),
+ ofMillis(WINDOW_SIZE),
+ false),
+ Serdes.String(),
+ Serdes.String())
+ .withLoggingDisabled()
+ .build();
+
+ typedStore.init(context, typedStore);
+
+ try {
+ // Put some data into the store
+ context.setRecordContext(new ProcessorRecordContext(0, 1, 0,
"", new RecordHeaders()));
+ typedStore.put("test-key",
ValueTimestampHeaders.make("value1", 1000L, new RecordHeaders()), 1000L);
+ context.setRecordContext(new ProcessorRecordContext(0, 2, 0,
"", new RecordHeaders()));
+ typedStore.put("test-key",
ValueTimestampHeaders.make("value2", 5000L, new RecordHeaders()), 5000L);
+
+ // Query at typed level - WindowKeyQuery should return
windowed values with timestamps
+ final WindowKeyQuery<String, ValueAndTimestamp<String>> query
= WindowKeyQuery.withKeyAndWindowStartRange(
+ "test-key",
+ Instant.ofEpochMilli(0),
+ Instant.ofEpochMilli(10000L)
+ );
+ final
QueryResult<WindowStoreIterator<ValueAndTimestamp<String>>> result =
+ typedStore.query(query, PositionBound.unbounded(), new
QueryConfig(false));
+
+ // Verify IQv2 query result
+ assertTrue(result.isSuccess(), "Expected query to succeed on
in-memory store");
+ assertNotNull(result.getResult(), "Expected result iterator to
be present");
+ assertNotNull(result.getPosition(), "Expected position to be
set");
+
+ // Verify the actual query results
+ final WindowStoreIterator<ValueAndTimestamp<String>> iterator
= result.getResult();
+ assertTrue(iterator.hasNext(), "Expected at least one result");
+
+ KeyValue<Long, ValueAndTimestamp<String>> kv = iterator.next();
+ assertEquals(1000L, kv.key, "Expected first window timestamp");
+ assertEquals("value1", kv.value.value(), "WindowKeyQuery
should return the value");
+ assertEquals(1000L, kv.value.timestamp(), "WindowKeyQuery
should return the timestamp");
+
+ assertTrue(iterator.hasNext(), "Expected second result");
+ kv = iterator.next();
+ assertEquals(5000L, kv.key, "Expected second window
timestamp");
+ assertEquals("value2", kv.value.value(), "WindowKeyQuery
should return the value");
+ assertEquals(5000L, kv.value.timestamp(), "WindowKeyQuery
should return the timestamp");
+
+ assertFalse(iterator.hasNext(), "Expected no more results");
+ iterator.close();
+ } finally {
+ typedStore.close();
+ }
+ }
+
+ @Test
+ public void shouldHandleWindowRangeQuerySuccessfullyOnInMemoryStore() {
+ // Build a typed window store using timestamped window store with
headers
+ final TimestampedWindowStoreWithHeaders<String, String> typedStore
= Stores.timestampedWindowStoreWithHeadersBuilder(
+ Stores.inMemoryWindowStore(
+ "typed-window-range-store",
+ ofMillis(RETENTION_PERIOD),
+ ofMillis(WINDOW_SIZE),
+ false),
+ Serdes.String(),
+ Serdes.String())
+ .withLoggingDisabled()
+ .build();
+
+ typedStore.init(context, typedStore);
+
+ try {
+ // Put some data into the store
+ context.setRecordContext(new ProcessorRecordContext(0, 1, 0,
"", new RecordHeaders()));
+ typedStore.put("key1", ValueTimestampHeaders.make("value1",
1000L, new RecordHeaders()), 1000L);
+ context.setRecordContext(new ProcessorRecordContext(0, 2, 0,
"", new RecordHeaders()));
+ typedStore.put("key2", ValueTimestampHeaders.make("value2",
5000L, new RecordHeaders()), 5000L);
+ context.setRecordContext(new ProcessorRecordContext(0, 3, 0,
"", new RecordHeaders()));
+ typedStore.put("key3", ValueTimestampHeaders.make("value3",
3000L, new RecordHeaders()), 3000L);
+
+ // Query at typed level - WindowRangeQuery should return all
windowed key-values with timestamps
+ final WindowRangeQuery<String, ValueAndTimestamp<String>>
query = WindowRangeQuery.withWindowStartRange(
+ Instant.ofEpochMilli(0),
+ Instant.ofEpochMilli(10000L)
+ );
+ final QueryResult<KeyValueIterator<Windowed<String>,
ValueAndTimestamp<String>>> result =
+ typedStore.query(query, PositionBound.unbounded(), new
QueryConfig(false));
+
+ // Verify IQv2 query result
+ assertTrue(result.isSuccess(), "Expected query to succeed on
in-memory store");
+ assertNotNull(result.getResult(), "Expected result iterator to
be present");
+ assertNotNull(result.getPosition(), "Expected position to be
set");
+
+ // Verify the actual query results (should be sorted by window
timestamp)
+ final KeyValueIterator<Windowed<String>,
ValueAndTimestamp<String>> iterator = result.getResult();
+
+ assertTrue(iterator.hasNext(), "Expected at least one result");
+ KeyValue<Windowed<String>, ValueAndTimestamp<String>> kv =
iterator.next();
+ assertEquals("key1", kv.key.key(), "Expected first key");
+ assertEquals(1000L, kv.key.window().start(), "Expected first
window start");
+ assertEquals("value1", kv.value.value(), "WindowRangeQuery
should return the value");
+ assertEquals(1000L, kv.value.timestamp(), "WindowRangeQuery
should return the timestamp");
+
+ assertTrue(iterator.hasNext(), "Expected second result");
+ kv = iterator.next();
+ assertEquals("key3", kv.key.key(), "Expected second key");
+ assertEquals(3000L, kv.key.window().start(), "Expected second
window start");
+ assertEquals("value3", kv.value.value(), "WindowRangeQuery
should return the value");
+
+ assertTrue(iterator.hasNext(), "Expected third result");
+ kv = iterator.next();
+ assertEquals("key2", kv.key.key(), "Expected third key");
+ assertEquals(5000L, kv.key.window().start(), "Expected third
window start");
+ assertEquals("value2", kv.value.value(), "WindowRangeQuery
should return the value");
+
+ assertFalse(iterator.hasNext(), "Expected no more results");
+ iterator.close();
+ } finally {
+ typedStore.close();
+ }
+ }
+
+ @Test
+ public void shouldCollectExecutionInfoForInMemoryStoreWhenRequested() {
+ final WindowKeyQuery<Bytes, byte[]> query =
WindowKeyQuery.withKeyAndWindowStartRange(
+ new Bytes("test-key".getBytes()),
+ Instant.ofEpochMilli(0),
+ Instant.ofEpochMilli(Long.MAX_VALUE)
+ );
+ final PositionBound positionBound = PositionBound.unbounded();
+ final QueryConfig config = new QueryConfig(true); // Enable
execution info
+
+ final QueryResult<WindowStoreIterator<byte[]>> result =
inMemoryStore.query(query, positionBound, config);
+
+ // Verify: Execution info was collected
+ assertFalse(result.getExecutionInfo().isEmpty(), "Expected
execution info to be collected");
+ boolean foundInMemoryStoreInfo = false;
+ for (final String info : result.getExecutionInfo()) {
+ if (info.contains("Handled in") &&
info.contains(InMemoryWindowStore.class.getName())) {
+ foundInMemoryStoreInfo = true;
+ break;
+ }
+ }
+ assertTrue(foundInMemoryStoreInfo, "Expected execution info to
mention the in-memory store class");
+ }
+
+ @Test
+ public void
shouldNotCollectExecutionInfoForInMemoryStoreWhenNotRequested() {
+ final WindowKeyQuery<Bytes, byte[]> query =
WindowKeyQuery.withKeyAndWindowStartRange(
+ new Bytes("test-key".getBytes()),
+ Instant.ofEpochMilli(0),
+ Instant.ofEpochMilli(Long.MAX_VALUE)
+ );
+ final PositionBound positionBound = PositionBound.unbounded();
+ final QueryConfig config = new QueryConfig(false); // Disable
execution info
+
+ final QueryResult<WindowStoreIterator<byte[]>> result =
inMemoryStore.query(query, positionBound, config);
+
+ // Verify: No execution info was collected
+ assertTrue(result.getExecutionInfo().isEmpty(), "Expected no
execution info to be collected");
+ }
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedWindowStoreWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedWindowStoreWithHeadersTest.java
new file mode 100644
index 00000000000..554f5559865
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedWindowStoreWithHeadersTest.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.query.FailureReason;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.WindowKeyQuery;
+import org.apache.kafka.streams.query.WindowRangeQuery;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.time.Instant;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RocksDBTimestampedWindowStoreWithHeadersTest {
+
+ private static final String STORE_NAME = "test-window-store";
+ private static final long WINDOW_SIZE = 10_000L;
+ private static final long RETENTION_PERIOD = 60_000L;
+ private static final long SEGMENT_INTERVAL = 30_000L;
+
+ private RocksDBTimestampedWindowStoreWithHeaders windowStore;
+ private InternalMockProcessorContext<String, String> context;
+ private File baseDir;
+
+ @BeforeEach
+ public void setUp() {
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ baseDir = TestUtils.tempDirectory();
+ context = new InternalMockProcessorContext<>(
+ baseDir,
+ Serdes.String(),
+ Serdes.String(),
+ new StreamsConfig(props)
+ );
+
+ final SegmentedBytesStore segmentedBytesStore = new
RocksDBSegmentedBytesStore(
+ STORE_NAME,
+ "test-metrics-scope",
+ RETENTION_PERIOD,
+ SEGMENT_INTERVAL,
+ new WindowKeySchema()
+ );
+
+ windowStore = new RocksDBTimestampedWindowStoreWithHeaders(
+ segmentedBytesStore,
+ false,
+ WINDOW_SIZE
+ );
+
+ windowStore.init(context, windowStore);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (windowStore != null) {
+ windowStore.close();
+ }
+ }
+
+ @Test
+ public void shouldReturnUnknownQueryTypeForWindowKeyQuery() {
+ final WindowKeyQuery<Bytes, byte[]> query =
WindowKeyQuery.withKeyAndWindowStartRange(
+ new Bytes("test-key".getBytes()),
+ Instant.ofEpochMilli(0),
+ Instant.ofEpochMilli(Long.MAX_VALUE)
+ );
+ final PositionBound positionBound = PositionBound.unbounded();
+ final QueryConfig config = new QueryConfig(false);
+
+ final QueryResult<WindowStoreIterator<byte[]>> result =
windowStore.query(query, positionBound, config);
+
+ // Verify: Window store with headers currently returns
UNKNOWN_QUERY_TYPE
+ assertFalse(result.isSuccess(), "Expected query to fail with unknown
query type");
+ assertEquals(
+ FailureReason.UNKNOWN_QUERY_TYPE,
+ result.getFailureReason(),
+ "Expected UNKNOWN_QUERY_TYPE failure reason"
+ );
+ assertNotNull(result.getPosition(), "Expected position to be set");
+ }
+
+ @Test
+ public void shouldReturnUnknownQueryTypeForWindowRangeQuery() {
+ final WindowRangeQuery<Bytes, byte[]> query =
WindowRangeQuery.withWindowStartRange(
+ Instant.ofEpochMilli(0),
+ Instant.ofEpochMilli(Long.MAX_VALUE)
+ );
+ final PositionBound positionBound = PositionBound.unbounded();
+ final QueryConfig config = new QueryConfig(false);
+
+ final
QueryResult<KeyValueIterator<org.apache.kafka.streams.kstream.Windowed<Bytes>,
byte[]>> result =
+ windowStore.query(query, positionBound, config);
+
+ // Verify: Window store with headers currently returns
UNKNOWN_QUERY_TYPE
+ assertFalse(result.isSuccess(), "Expected query to fail with unknown
query type");
+ assertEquals(
+ FailureReason.UNKNOWN_QUERY_TYPE,
+ result.getFailureReason(),
+ "Expected UNKNOWN_QUERY_TYPE failure reason"
+ );
+ assertNotNull(result.getPosition(), "Expected position to be set");
+ }
+
+ @Test
+ public void shouldCollectExecutionInfoWhenRequested() {
+ final WindowKeyQuery<Bytes, byte[]> query =
WindowKeyQuery.withKeyAndWindowStartRange(
+ new Bytes("test-key".getBytes()),
+ Instant.ofEpochMilli(0),
+ Instant.ofEpochMilli(Long.MAX_VALUE)
+ );
+ final PositionBound positionBound = PositionBound.unbounded();
+ final QueryConfig config = new QueryConfig(true); // Enable execution
info
+
+ final QueryResult<WindowStoreIterator<byte[]>> result =
windowStore.query(query, positionBound, config);
+
+ // Verify: Execution info was collected
+ assertFalse(result.getExecutionInfo().isEmpty(), "Expected execution
info to be collected");
+ assertTrue(
+ result.getExecutionInfo().get(0).contains("Handled in"),
+ "Expected execution info to contain handling information"
+ );
+ assertTrue(
+
result.getExecutionInfo().get(0).contains(RocksDBTimestampedWindowStoreWithHeaders.class.getName()),
+ "Expected execution info to mention the class name"
+ );
+ }
+
+ @Test
+ public void shouldNotCollectExecutionInfoWhenNotRequested() {
+ final WindowKeyQuery<Bytes, byte[]> query =
WindowKeyQuery.withKeyAndWindowStartRange(
+ new Bytes("test-key".getBytes()),
+ Instant.ofEpochMilli(0),
+ Instant.ofEpochMilli(Long.MAX_VALUE)
+ );
+ final PositionBound positionBound = PositionBound.unbounded();
+ final QueryConfig config = new QueryConfig(false); // Disable
execution info
+
+ final QueryResult<WindowStoreIterator<byte[]>> result =
windowStore.query(query, positionBound, config);
+
+ // Verify: No execution info was collected
+ assertTrue(result.getExecutionInfo().isEmpty(), "Expected no execution
info to be collected");
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapterTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapterTest.java
new file mode 100644
index 00000000000..cf1649b144c
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedToHeadersWindowStoreAdapterTest.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.query.WindowKeyQuery;
+import org.apache.kafka.streams.query.WindowRangeQuery;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.time.Instant;
+import java.util.Properties;
+
+import static java.time.Duration.ofMillis;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TimestampedToHeadersWindowStoreAdapterTest {
+
+ private static final long WINDOW_SIZE = 10_000L;
+ private static final long RETENTION_PERIOD = 60_000L;
+ private static final long SEGMENT_INTERVAL = 30_000L;
+
+ private TimestampedToHeadersWindowStoreAdapter adapter;
+ private RocksDBTimestampedWindowStore underlyingStore;
+ private InternalMockProcessorContext<String, String> context;
+ private File baseDir;
+
+ @BeforeEach
+ public void setUp() {
+ final Properties props = StreamsTestUtils.getStreamsConfig();
+ baseDir = TestUtils.tempDirectory();
+ context = new InternalMockProcessorContext<>(
+ baseDir,
+ Serdes.String(),
+ Serdes.String(),
+ new StreamsConfig(props)
+ );
+
+ final SegmentedBytesStore segmentedBytesStore = new
RocksDBSegmentedBytesStore(
+ "iqv2-test-store",
+ "test-metrics-scope",
+ RETENTION_PERIOD,
+ SEGMENT_INTERVAL,
+ new WindowKeySchema()
+ );
+
+ underlyingStore = new RocksDBTimestampedWindowStore(
+ segmentedBytesStore,
+ false,
+ WINDOW_SIZE
+ );
+
+ adapter = new TimestampedToHeadersWindowStoreAdapter(underlyingStore);
+ adapter.init(context, adapter);
+ }
+
+ @AfterEach
+ public void tearDown() {
+ if (adapter != null) {
+ adapter.close();
+ }
+ }
+
+ @Test
+ public void shouldHandleWindowKeyQuerySuccessfully() {
+ // Build a typed window store using timestamped window store (adapter
wraps it)
+ final TimestampedWindowStoreWithHeaders<String, String> store =
Stores.timestampedWindowStoreWithHeadersBuilder(
+ Stores.persistentTimestampedWindowStore(
+ "typed-adapter-test",
+ ofMillis(RETENTION_PERIOD),
+ ofMillis(WINDOW_SIZE),
+ false),
+ Serdes.String(),
+ Serdes.String())
+ .withLoggingDisabled()
+ .build();
+
+ store.init(context, store);
+
+ try {
+ // Put data into the store (headers will be empty when adapted
from timestamped store)
+ final Headers headers1 = new RecordHeaders();
+ headers1.add("key", "header1".getBytes());
+ store.put("test-key", ValueTimestampHeaders.make("value1", 1000L,
headers1), 1000L);
+
+ final Headers headers2 = new RecordHeaders();
+ headers2.add("key", "header2".getBytes());
+ store.put("test-key", ValueTimestampHeaders.make("value2", 5000L,
headers2), 5000L);
+
+ // Verify adapter is used for legacy timestamped window store
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ assertInstanceOf(TimestampedToHeadersWindowStoreAdapter.class,
wrapped,
+ "Expected TimestampedToHeadersWindowStoreAdapter for
legacy timestamped window store");
+
+ // Query at typed level - WindowKeyQuery should return windowed
values with timestamps
+ final WindowKeyQuery<String, ValueAndTimestamp<String>> query =
WindowKeyQuery.withKeyAndWindowStartRange(
+ "test-key",
+ Instant.ofEpochMilli(0),
+ Instant.ofEpochMilli(10000L)
+ );
+ final QueryResult<WindowStoreIterator<ValueAndTimestamp<String>>>
result =
+ store.query(query, PositionBound.unbounded(), new
QueryConfig(false));
+
+ // Verify IQv2 query result
+ // Adapter delegates to RocksDBTimestampedWindowStore which
supports IQv2
+ assertTrue(result.isSuccess(),
+ "Expected query to succeed since
RocksDBTimestampedWindowStore supports IQv2");
+ assertNotNull(result.getPosition(), "Expected position to be set");
+ assertNotNull(result.getResult(), "Expected non-null result");
+
+ // Verify the actual query results
+ final WindowStoreIterator<ValueAndTimestamp<String>> iterator =
result.getResult();
+ assertTrue(iterator.hasNext(), "Expected at least one result");
+
+ KeyValue<Long, ValueAndTimestamp<String>> kv = iterator.next();
+ assertEquals(1000L, kv.key, "Expected first window timestamp");
+ assertInstanceOf(ValueAndTimestamp.class, kv.value);
+ assertEquals("value1", kv.value.value(), "WindowKeyQuery should
return the value");
+ assertEquals(1000L, kv.value.timestamp(), "WindowKeyQuery should
return the timestamp");
+
+ assertTrue(iterator.hasNext(), "Expected second result");
+ kv = iterator.next();
+ assertEquals(5000L, kv.key, "Expected second window timestamp");
+ assertEquals("value2", kv.value.value(), "WindowKeyQuery should
return the value");
+ assertEquals(5000L, kv.value.timestamp(), "WindowKeyQuery should
return the timestamp");
+
+ assertFalse(iterator.hasNext(), "Expected no more results");
+ iterator.close();
+ } finally {
+ store.close();
+ }
+ }
+
+ @Test
+ public void shouldHandleWindowRangeQuerySuccessfully() {
+ // Build a typed window store using timestamped window store (adapter
wraps it)
+ final TimestampedWindowStoreWithHeaders<String, String> store =
Stores.timestampedWindowStoreWithHeadersBuilder(
+ Stores.persistentTimestampedWindowStore(
+ "typed-range-adapter-test",
+ ofMillis(RETENTION_PERIOD),
+ ofMillis(WINDOW_SIZE),
+ false),
+ Serdes.String(),
+ Serdes.String())
+ .withLoggingDisabled()
+ .build();
+
+ store.init(context, store);
+
+ try {
+ // Put data into the store (headers will be empty when adapted
from timestamped store)
+ final Headers headers1 = new RecordHeaders();
+ headers1.add("source", "key1".getBytes());
+ store.put("key1", ValueTimestampHeaders.make("value1", 1000L,
headers1), 1000L);
+
+ final Headers headers2 = new RecordHeaders();
+ headers2.add("source", "key2".getBytes());
+ store.put("key2", ValueTimestampHeaders.make("value2", 5000L,
headers2), 5000L);
+
+ final Headers headers3 = new RecordHeaders();
+ headers3.add("source", "key3".getBytes());
+ store.put("key3", ValueTimestampHeaders.make("value3", 3000L,
headers3), 3000L);
+
+ // Verify adapter is used for legacy timestamped window store
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ assertInstanceOf(TimestampedToHeadersWindowStoreAdapter.class,
wrapped,
+ "Expected TimestampedToHeadersWindowStoreAdapter for
legacy timestamped window store");
+
+ // Query at typed level - WindowRangeQuery should return all
windowed key-values with timestamps
+ final WindowRangeQuery<String, ValueAndTimestamp<String>> query =
WindowRangeQuery.withWindowStartRange(
+ Instant.ofEpochMilli(0),
+ Instant.ofEpochMilli(10000L)
+ );
+ final QueryResult<KeyValueIterator<Windowed<String>,
ValueAndTimestamp<String>>> result =
+ store.query(query, PositionBound.unbounded(), new
QueryConfig(false));
+
+ // Verify IQv2 query result
+ // Adapter delegates to RocksDBTimestampedWindowStore which
supports IQv2
+ assertTrue(result.isSuccess(),
+ "Expected query to succeed since
RocksDBTimestampedWindowStore supports IQv2");
+ assertNotNull(result.getResult(), "Expected result iterator to be
present");
+ assertNotNull(result.getPosition(), "Expected position to be set");
+
+ // Verify the actual query results (should be sorted by key then
timestamp)
+ final KeyValueIterator<Windowed<String>,
ValueAndTimestamp<String>> iterator = result.getResult();
+
+ assertTrue(iterator.hasNext(), "Expected at least one result");
+ KeyValue<Windowed<String>, ValueAndTimestamp<String>> kv =
iterator.next();
+ assertEquals("key1", kv.key.key(), "Expected first key");
+ assertEquals(1000L, kv.key.window().start(), "Expected first
window start");
+ assertInstanceOf(ValueAndTimestamp.class, kv.value);
+ assertEquals("value1", kv.value.value(), "WindowRangeQuery should
return the value");
+ assertEquals(1000L, kv.value.timestamp(), "WindowRangeQuery should
return the timestamp");
+
+ assertTrue(iterator.hasNext(), "Expected second result");
+ kv = iterator.next();
+ assertEquals("key2", kv.key.key(), "Expected second key");
+ assertEquals(5000L, kv.key.window().start(), "Expected second
window start");
+ assertEquals("value2", kv.value.value(), "WindowRangeQuery should
return the value");
+
+ assertTrue(iterator.hasNext(), "Expected third result");
+ kv = iterator.next();
+ assertEquals("key3", kv.key.key(), "Expected third key");
+ assertEquals(3000L, kv.key.window().start(), "Expected third
window start");
+ assertEquals("value3", kv.value.value(), "WindowRangeQuery should
return the value");
+
+ assertFalse(iterator.hasNext(), "Expected no more results");
+ iterator.close();
+ } finally {
+ store.close();
+ }
+ }
+
+ @Test
+ public void shouldCollectExecutionInfoForWindowKeyQueryWhenRequested() {
+ final WindowKeyQuery<Bytes, byte[]> query =
WindowKeyQuery.withKeyAndWindowStartRange(
+ new Bytes("test-key".getBytes()),
+ Instant.ofEpochMilli(0),
+ Instant.ofEpochMilli(Long.MAX_VALUE)
+ );
+ final PositionBound positionBound = PositionBound.unbounded();
+ final QueryConfig config = new QueryConfig(true); // Enable execution
info
+
+ final QueryResult<WindowStoreIterator<byte[]>> result =
adapter.query(query, positionBound, config);
+
+ assertFalse(result.getExecutionInfo().isEmpty(), "Expected execution
info to be collected");
+ boolean foundAdapterInfo = false;
+ for (final String info : result.getExecutionInfo()) {
+ if (info.contains("Handled in") &&
info.contains(TimestampedToHeadersWindowStoreAdapter.class.getName())) {
+ foundAdapterInfo = true;
+ break;
+ }
+ }
+ assertTrue(foundAdapterInfo, "Expected execution info to mention the
adapter class");
+ }
+
+ @Test
+ public void shouldCollectExecutionInfoForWindowRangeQueryWhenRequested() {
+ final WindowRangeQuery<Bytes, byte[]> query =
WindowRangeQuery.withWindowStartRange(
+ Instant.ofEpochMilli(0),
+ Instant.ofEpochMilli(Long.MAX_VALUE)
+ );
+ final PositionBound positionBound = PositionBound.unbounded();
+ final QueryConfig config = new QueryConfig(true); // Enable execution
info
+
+ final QueryResult<KeyValueIterator<Windowed<Bytes>, byte[]>> result =
+ adapter.query(query, positionBound, config);
+
+ assertFalse(result.getExecutionInfo().isEmpty(), "Expected execution
info to be collected");
+ boolean foundAdapterInfo = false;
+ for (final String info : result.getExecutionInfo()) {
+ if (info.contains("Handled in") &&
info.contains(TimestampedToHeadersWindowStoreAdapter.class.getName())) {
+ foundAdapterInfo = true;
+ break;
+ }
+ }
+ assertTrue(foundAdapterInfo, "Expected execution info to mention the
adapter class");
+ }
+
+ @Test
+ public void shouldNotCollectExecutionInfoWhenNotRequested() {
+ final WindowKeyQuery<Bytes, byte[]> query =
WindowKeyQuery.withKeyAndWindowStartRange(
+ new Bytes("test-key".getBytes()),
+ Instant.ofEpochMilli(0),
+ Instant.ofEpochMilli(Long.MAX_VALUE)
+ );
+ final PositionBound positionBound = PositionBound.unbounded();
+ final QueryConfig config = new QueryConfig(false); // Disable
execution info
+
+ final QueryResult<WindowStoreIterator<byte[]>> result =
adapter.query(query, positionBound, config);
+
+ // Verify: Adapter's execution info was not collected
+ boolean foundAdapterInfo = false;
+ for (final String info : result.getExecutionInfo()) {
+ if
(info.contains(TimestampedToHeadersWindowStoreAdapter.class.getName())) {
+ foundAdapterInfo = true;
+ break;
+ }
+ }
+ assertFalse(foundAdapterInfo, "Expected no execution info from adapter
when not requested");
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilderTest.java
index 304a84c6976..67216bca411 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilderTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilderTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
+import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
@@ -40,166 +41,170 @@ import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.STRICT_STUBS)
public class TimestampedWindowStoreWithHeadersBuilderTest {
- private static final String STORE_NAME = "name";
- private static final String METRICS_SCOPE = "metricsScope";
-
- @Mock
- private WindowBytesStoreSupplier supplier;
- @Mock
- private RocksDBTimestampedWindowStoreWithHeaders
timestampedStoreWithHeaders;
-
- private TimestampedWindowStoreWithHeadersBuilder<String, String> builder;
-
- public void setUp() {
- when(supplier.name()).thenReturn(STORE_NAME);
- when(supplier.metricsScope()).thenReturn(METRICS_SCOPE);
- when(supplier.get()).thenReturn(timestampedStoreWithHeaders);
-
- builder = new TimestampedWindowStoreWithHeadersBuilder<>(
- supplier,
- Serdes.String(),
- Serdes.String(),
- new MockTime());
- }
-
- @Test
- public void shouldHaveMeteredStoreAsOuterStore() {
- setUp();
- final TimestampedWindowStoreWithHeaders<String, String> store =
builder.build();
- assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
- }
-
- @Test
- public void shouldHaveChangeLoggingStoreByDefault() {
- setUp();
- final TimestampedWindowStoreWithHeaders<String, String> store =
builder.build();
- final StateStore next = ((WrappedStateStore) store).wrapped();
-
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
next);
- }
-
- @Test
- public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
- setUp();
- final TimestampedWindowStoreWithHeaders<String, String> store =
builder.withLoggingDisabled().build();
- final StateStore next = ((WrappedStateStore) store).wrapped();
- assertSame(timestampedStoreWithHeaders, next);
- }
-
- @Test
- public void shouldHaveCachingStoreWhenEnabled() {
- setUp();
- final TimestampedWindowStoreWithHeaders<String, String> store =
builder.withCachingEnabled().build();
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
- assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
- assertInstanceOf(CachingWindowStore.class, wrapped);
- }
-
- @Test
- public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() {
- setUp();
- final TimestampedWindowStoreWithHeaders<String, String> store = builder
- .withLoggingEnabled(Collections.emptyMap())
- .build();
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
- assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
-
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
wrapped);
- assertSame(timestampedStoreWithHeaders, ((WrappedStateStore)
wrapped).wrapped());
- }
-
- @Test
- public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
- setUp();
- final TimestampedWindowStoreWithHeaders<String, String> store = builder
- .withLoggingEnabled(Collections.emptyMap())
- .withCachingEnabled()
- .build();
- final WrappedStateStore caching = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
- final WrappedStateStore changeLogging = (WrappedStateStore)
caching.wrapped();
- assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
- assertInstanceOf(CachingWindowStore.class, caching);
-
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
changeLogging);
- assertSame(timestampedStoreWithHeaders, changeLogging.wrapped());
- }
-
- @Test
- public void shouldNotWrapHeadersByteStore() {
- when(supplier.name()).thenReturn(STORE_NAME);
- when(supplier.metricsScope()).thenReturn(METRICS_SCOPE);
- when(supplier.get()).thenReturn(new
RocksDBTimestampedWindowStoreWithHeaders(
- new RocksDBTimestampedSegmentedBytesStoreWithHeaders(
- "name",
- "metric-scope",
- 10L,
- 5L,
- new WindowKeySchema()),
- false,
- 1L));
-
- builder = new TimestampedWindowStoreWithHeadersBuilder<>(
- supplier,
- Serdes.String(),
- Serdes.String(),
- new MockTime());
-
- final TimestampedWindowStoreWithHeaders<String, String> store = builder
- .withLoggingDisabled()
- .withCachingDisabled()
- .build();
- assertInstanceOf(RocksDBTimestampedWindowStoreWithHeaders.class,
((WrappedStateStore) store).wrapped());
- }
-
- @Test
- public void shouldWrapTimestampedStoreAsHeadersStore() {
- when(supplier.name()).thenReturn(STORE_NAME);
- when(supplier.metricsScope()).thenReturn(METRICS_SCOPE);
- when(supplier.get()).thenReturn(new RocksDBTimestampedWindowStore(
- new RocksDBTimestampedSegmentedBytesStore(
- "name",
- "metric-scope",
- 10L,
- 5L,
- new WindowKeySchema()),
- false,
- 1L));
-
- builder = new TimestampedWindowStoreWithHeadersBuilder<>(
- supplier,
- Serdes.String(),
- Serdes.String(),
- new MockTime());
-
- final TimestampedWindowStoreWithHeaders<String, String> store = builder
- .withLoggingDisabled()
- .withCachingDisabled()
- .build();
- assertInstanceOf(TimestampedToHeadersWindowStoreAdapter.class,
((WrappedStateStore) store).wrapped());
- }
-
- @Test
- public void shouldDisableCachingWithRetainDuplicates() {
- when(supplier.name()).thenReturn(STORE_NAME);
- when(supplier.metricsScope()).thenReturn(METRICS_SCOPE);
- when(supplier.retainDuplicates()).thenReturn(true);
- when(supplier.get()).thenReturn(timestampedStoreWithHeaders);
-
- builder = new TimestampedWindowStoreWithHeadersBuilder<>(
- supplier,
- Serdes.String(),
- Serdes.String(),
- new MockTime());
-
- final TimestampedWindowStoreWithHeaders<String, String> store = builder
- .withCachingEnabled()
- .withLoggingDisabled()
- .build();
-
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
- // Caching should be automatically disabled when retainDuplicates is
true
- assertSame(timestampedStoreWithHeaders, wrapped);
- }
- @Test
- public void shouldThrowNullPointerIfInnerIsNull() {
- assertThrows(NullPointerException.class, () -> new
TimestampedWindowStoreWithHeadersBuilder<>(null, Serdes.String(),
Serdes.String(), new MockTime()));
+ @Nested
+ class BuilderTests {
+ private static final String STORE_NAME = "name";
+ private static final String METRICS_SCOPE = "metricsScope";
+
+ @Mock
+ private WindowBytesStoreSupplier supplier;
+ @Mock
+ private RocksDBTimestampedWindowStoreWithHeaders
timestampedStoreWithHeaders;
+
+ private TimestampedWindowStoreWithHeadersBuilder<String, String>
builder;
+
+ public void setUp() {
+ when(supplier.name()).thenReturn(STORE_NAME);
+ when(supplier.metricsScope()).thenReturn(METRICS_SCOPE);
+ when(supplier.get()).thenReturn(timestampedStoreWithHeaders);
+
+ builder = new TimestampedWindowStoreWithHeadersBuilder<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime());
+ }
+
+ @Test
+ public void shouldHaveMeteredStoreAsOuterStore() {
+ setUp();
+ final TimestampedWindowStoreWithHeaders<String, String> store =
builder.build();
+ assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
+ }
+
+ @Test
+ public void shouldHaveChangeLoggingStoreByDefault() {
+ setUp();
+ final TimestampedWindowStoreWithHeaders<String, String> store =
builder.build();
+ final StateStore next = ((WrappedStateStore) store).wrapped();
+
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
next);
+ }
+
+ @Test
+ public void shouldNotHaveChangeLoggingStoreWhenDisabled() {
+ setUp();
+ final TimestampedWindowStoreWithHeaders<String, String> store =
builder.withLoggingDisabled().build();
+ final StateStore next = ((WrappedStateStore) store).wrapped();
+ assertSame(timestampedStoreWithHeaders, next);
+ }
+
+ @Test
+ public void shouldHaveCachingStoreWhenEnabled() {
+ setUp();
+ final TimestampedWindowStoreWithHeaders<String, String> store =
builder.withCachingEnabled().build();
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
+ assertInstanceOf(CachingWindowStore.class, wrapped);
+ }
+
+ @Test
+ public void shouldHaveChangeLoggingStoreWhenLoggingEnabled() {
+ setUp();
+ final TimestampedWindowStoreWithHeaders<String, String> store =
builder
+ .withLoggingEnabled(Collections.emptyMap())
+ .build();
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
+
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
wrapped);
+ assertSame(timestampedStoreWithHeaders, ((WrappedStateStore)
wrapped).wrapped());
+ }
+
+ @Test
+ public void shouldHaveCachingAndChangeLoggingWhenBothEnabled() {
+ setUp();
+ final TimestampedWindowStoreWithHeaders<String, String> store =
builder
+ .withLoggingEnabled(Collections.emptyMap())
+ .withCachingEnabled()
+ .build();
+ final WrappedStateStore caching = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
+ final WrappedStateStore changeLogging = (WrappedStateStore)
caching.wrapped();
+ assertInstanceOf(MeteredTimestampedWindowStoreWithHeaders.class,
store);
+ assertInstanceOf(CachingWindowStore.class, caching);
+
assertInstanceOf(ChangeLoggingTimestampedWindowBytesStoreWithHeaders.class,
changeLogging);
+ assertSame(timestampedStoreWithHeaders, changeLogging.wrapped());
+ }
+
+ @Test
+ public void shouldNotWrapHeadersByteStore() {
+ when(supplier.name()).thenReturn(STORE_NAME);
+ when(supplier.metricsScope()).thenReturn(METRICS_SCOPE);
+ when(supplier.get()).thenReturn(new
RocksDBTimestampedWindowStoreWithHeaders(
+ new RocksDBTimestampedSegmentedBytesStoreWithHeaders(
+ "name",
+ "metric-scope",
+ 10L,
+ 5L,
+ new WindowKeySchema()),
+ false,
+ 1L));
+
+ builder = new TimestampedWindowStoreWithHeadersBuilder<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime());
+
+ final TimestampedWindowStoreWithHeaders<String, String> store =
builder
+ .withLoggingDisabled()
+ .withCachingDisabled()
+ .build();
+ assertInstanceOf(RocksDBTimestampedWindowStoreWithHeaders.class,
((WrappedStateStore) store).wrapped());
+ }
+
+ @Test
+ public void shouldWrapTimestampedStoreAsHeadersStore() {
+ when(supplier.name()).thenReturn(STORE_NAME);
+ when(supplier.metricsScope()).thenReturn(METRICS_SCOPE);
+ when(supplier.get()).thenReturn(new RocksDBTimestampedWindowStore(
+ new RocksDBTimestampedSegmentedBytesStore(
+ "name",
+ "metric-scope",
+ 10L,
+ 5L,
+ new WindowKeySchema()),
+ false,
+ 1L));
+
+ builder = new TimestampedWindowStoreWithHeadersBuilder<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime());
+
+ final TimestampedWindowStoreWithHeaders<String, String> store =
builder
+ .withLoggingDisabled()
+ .withCachingDisabled()
+ .build();
+ assertInstanceOf(TimestampedToHeadersWindowStoreAdapter.class,
((WrappedStateStore) store).wrapped());
+ }
+
+ @Test
+ public void shouldDisableCachingWithRetainDuplicates() {
+ when(supplier.name()).thenReturn(STORE_NAME);
+ when(supplier.metricsScope()).thenReturn(METRICS_SCOPE);
+ when(supplier.retainDuplicates()).thenReturn(true);
+ when(supplier.get()).thenReturn(timestampedStoreWithHeaders);
+
+ builder = new TimestampedWindowStoreWithHeadersBuilder<>(
+ supplier,
+ Serdes.String(),
+ Serdes.String(),
+ new MockTime());
+
+ final TimestampedWindowStoreWithHeaders<String, String> store =
builder
+ .withCachingEnabled()
+ .withLoggingDisabled()
+ .build();
+
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ // Caching should be automatically disabled when retainDuplicates
is true
+ assertSame(timestampedStoreWithHeaders, wrapped);
+ }
+
+ @Test
+ public void shouldThrowNullPointerIfInnerIsNull() {
+ assertThrows(NullPointerException.class, () -> new
TimestampedWindowStoreWithHeadersBuilder<>(null, Serdes.String(),
Serdes.String(), new MockTime()));
+ }
}
}