This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5fc5d7591c5 KAFKA-20134: Implement TimestampedWindowStoreWithHeaders
(N/N) (#21581)
5fc5d7591c5 is described below
commit 5fc5d7591c515f91dd8b1a92fec40aacb1a31f1a
Author: TengYao Chi <[email protected]>
AuthorDate: Fri Mar 6 18:09:42 2026 +0000
KAFKA-20134: Implement TimestampedWindowStoreWithHeaders (N/N) (#21581)
This PR implements the upgrade integration tests for
`TimestampedWindowStateStoreWithHeaders` introduced in KIP-1271.
The class should be reviewd: `HeadersStoreUpgradeIntegrationTest`
This should not be merged before #21497
Reviewers: Matthias J. Sax <[email protected]>, Alieh Saeedi
<[email protected]>
---
.../HeadersStoreUpgradeIntegrationTest.java | 472 +++++++++++++++++++++
1 file changed, 472 insertions(+)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
index 9b6371f5a19..2976f0713fe 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
@@ -25,17 +25,23 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.TimestampedWindowStore;
+import org.apache.kafka.streams.state.TimestampedWindowStoreWithHeaders;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
import org.apache.kafka.test.TestUtils;
@@ -50,14 +56,23 @@ import org.junit.jupiter.api.TestInfo;
import java.io.IOException;
import java.time.Duration;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Properties;
+import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
@Tag("integration")
public class HeadersStoreUpgradeIntegrationTest {
private static final String STORE_NAME = "store";
+ private static final String WINDOW_STORE_NAME = "window-store";
+ private static final long WINDOW_SIZE_MS = 1000L;
+ private static final long RETENTION_MS = Duration.ofDays(1).toMillis();
+
private String inputStream;
private KafkaStreams kafkaStreams;
@@ -349,4 +364,461 @@ public class HeadersStoreUpgradeIntegrationTest {
store.put(record.key(), ValueTimestampHeaders.make(record.value(),
record.timestamp(), record.headers()));
}
}
+
+ @Test
+ public void
shouldMigrateInMemoryTimestampedWindowStoreToTimestampedWindowStoreWithHeaders()
throws Exception {
+
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(false);
+ }
+
+ @Test
+ public void
shouldMigratePersistentTimestampedWindowStoreToTimestampedWindowStoreWithHeaders()
throws Exception {
+
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(true);
+ }
+
+ /**
+ * Tests migration from TimestampedWindowStore to
TimestampedWindowStoreWithHeaders.
+ * This is a true migration where both supplier and builder are upgraded.
+ */
+ private void
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(final
boolean persistentStore) throws Exception {
+ // Phase 1: Run with old TimestampedWindowStore
+ final StreamsBuilder oldBuilder = new StreamsBuilder();
+ oldBuilder.addStateStore(
+ Stores.timestampedWindowStoreBuilder(
+ persistentStore
+ ?
Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false)
+ : Stores.inMemoryWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+ kafkaStreams.start();
+
+ final long baseTime = CLUSTER.time.milliseconds();
+ processWindowedKeyValueAndVerifyTimestamped("key1", "value1", baseTime
+ 100);
+ processWindowedKeyValueAndVerifyTimestamped("key2", "value2", baseTime
+ 200);
+ processWindowedKeyValueAndVerifyTimestamped("key3", "value3", baseTime
+ 300);
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+ final StreamsBuilder newBuilder = new StreamsBuilder();
+ newBuilder.addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+ persistentStore
+ ?
Stores.persistentTimestampedWindowStoreWithHeaders(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false)
+ : Stores.inMemoryWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedWithHeadersProcessor::new,
WINDOW_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+ kafkaStreams.start();
+
+ verifyWindowValueWithEmptyHeaders("key1", "value1", baseTime + 100);
+ verifyWindowValueWithEmptyHeaders("key2", "value2", baseTime + 200);
+ verifyWindowValueWithEmptyHeaders("key3", "value3", baseTime + 300);
+
+ final Headers headers = new RecordHeaders();
+ headers.add("source", "migration-test".getBytes());
+ headers.add("version", "1.0".getBytes());
+
+ processWindowedKeyValueWithHeadersAndVerify("key3", "value3-updated",
baseTime + 350, headers, headers);
+ processWindowedKeyValueWithHeadersAndVerify("key4", "value4", baseTime
+ 400, headers, headers);
+
+ kafkaStreams.close();
+ }
+
+ @Test
+ public void
shouldProxyTimestampedWindowStoreToTimestampedWindowStoreWithHeaders() throws
Exception {
+ final StreamsBuilder oldBuilder = new StreamsBuilder();
+ oldBuilder.addStateStore(
+ Stores.timestampedWindowStoreBuilder(
+ Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+ kafkaStreams.start();
+
+ final long baseTime = CLUSTER.time.milliseconds();
+ processWindowedKeyValueAndVerifyTimestamped("key1", "value1", baseTime
+ 100);
+ processWindowedKeyValueAndVerifyTimestamped("key2", "value2", baseTime
+ 200);
+ processWindowedKeyValueAndVerifyTimestamped("key3", "value3", baseTime
+ 300);
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+ // Restart with headers-aware builder but non-headers supplier
(proxy/adapter mode)
+ final StreamsBuilder newBuilder = new StreamsBuilder();
+ newBuilder.addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+ Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false), //
non-headers supplier!
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedWithHeadersProcessor::new,
WINDOW_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+ kafkaStreams.start();
+
+ verifyWindowValueWithEmptyHeaders("key1", "value1", baseTime + 100);
+ verifyWindowValueWithEmptyHeaders("key2", "value2", baseTime + 200);
+ verifyWindowValueWithEmptyHeaders("key3", "value3", baseTime + 300);
+
+ final RecordHeaders headers = new RecordHeaders();
+ headers.add("source", "proxy-test".getBytes());
+
+ // In proxy mode, headers are stripped when writing to non-headers
store
+ // So we expect empty headers when reading back
+ final Headers expectedHeaders = new RecordHeaders();
+
+ processWindowedKeyValueWithHeadersAndVerify("key3", "value3-updated",
baseTime + 350, headers, expectedHeaders);
+ processWindowedKeyValueWithHeadersAndVerify("key4", "value4", baseTime
+ 400, headers, expectedHeaders);
+
+ kafkaStreams.close();
+ }
+
+ private void processWindowedKeyValueAndVerifyTimestamped(final String key,
+ final String
value,
+ final long
timestamp) throws Exception {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair(key, value)),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class),
+ timestamp,
+ false);
+
+ TestUtils.waitForCondition(() -> {
+ try {
+ final ReadOnlyWindowStore<String, ValueAndTimestamp<String>>
store =
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.timestampedWindowStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final long windowStart = timestamp - (timestamp %
WINDOW_SIZE_MS);
+ final ValueAndTimestamp<String> result = store.fetch(key,
windowStart);
+
+ return result != null
+ && result.value().equals(value)
+ && result.timestamp() == timestamp;
+ } catch (final Exception e) {
+ return false;
+ }
+ }, 60_000L, "Could not verify timestamped value in time.");
+ }
+
+ private void processWindowedKeyValueWithHeadersAndVerify(final String key,
+ final String
value,
+ final long
timestamp,
+ final Headers
headers,
+ final Headers
expectedHeaders) throws Exception {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair(key, value)),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class),
+ headers,
+ timestamp,
+ false);
+
+ TestUtils.waitForCondition(() -> {
+ try {
+ final ReadOnlyWindowStore<String,
ValueTimestampHeaders<String>> store =
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.windowStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final long windowStart = timestamp - (timestamp %
WINDOW_SIZE_MS);
+
+ final List<KeyValue<Windowed<String>,
ValueTimestampHeaders<String>>> results = new LinkedList<>();
+ try (final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator = store.all()) {
+ while (iterator.hasNext()) {
+ final KeyValue<Windowed<String>,
ValueTimestampHeaders<String>> kv = iterator.next();
+ if (kv.key.key().equals(key) &&
kv.key.window().start() == windowStart) {
+ results.add(kv);
+ }
+ }
+ }
+
+ if (results.isEmpty()) {
+ return false;
+ }
+
+ final ValueTimestampHeaders<String> result =
results.get(0).value;
+ return result != null
+ && result.value().equals(value)
+ && result.timestamp() == timestamp
+ && result.headers().equals(expectedHeaders);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }, 60_000L, "Could not verify windowed value with headers in time.");
+ }
+
+ private void verifyWindowValueWithEmptyHeaders(final String key,
+ final String value,
+ final long timestamp)
throws Exception {
+ TestUtils.waitForCondition(() -> {
+ try {
+ final ReadOnlyWindowStore<String,
ValueTimestampHeaders<String>> store =
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.windowStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final long windowStart = timestamp - (timestamp %
WINDOW_SIZE_MS);
+
+ final List<KeyValue<Windowed<String>,
ValueTimestampHeaders<String>>> results = new LinkedList<>();
+ try (final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator = store.all()) {
+ while (iterator.hasNext()) {
+ final KeyValue<Windowed<String>,
ValueTimestampHeaders<String>> kv = iterator.next();
+ if (kv.key.key().equals(key) &&
kv.key.window().start() == windowStart) {
+ results.add(kv);
+ }
+ }
+ }
+
+ if (results.isEmpty()) {
+ return false;
+ }
+
+ final ValueTimestampHeaders<String> result =
results.get(0).value;
+ assertNotNull(result, "Result should not be null");
+ assertEquals(value, result.value(), "Value should match");
+ assertEquals(timestamp, result.timestamp(), "Timestamp should
match");
+
+ // Verify headers exist but are empty (migrated from
timestamped store without headers)
+ assertNotNull(result.headers(), "Headers should not be null
for migrated data");
+ assertEquals(0, result.headers().toArray().length, "Headers
should be empty for migrated data");
+
+ return true;
+ } catch (final Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }, 60_000L, "Could not verify legacy value with empty headers in
time.");
+ }
+
+ /**
+ * Processor for TimestampedWindowStore (without headers).
+ */
+ private static class TimestampedWindowedProcessor implements
Processor<String, String, Void, Void> {
+ private TimestampedWindowStore<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore(WINDOW_STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ final long windowStart = record.timestamp() - (record.timestamp()
% WINDOW_SIZE_MS);
+ store.put(record.key(), ValueAndTimestamp.make(record.value(),
record.timestamp()), windowStart);
+ }
+ }
+
+ /**
+ * Processor for TimestampedWindowStoreWithHeaders (with headers).
+ */
+ private static class TimestampedWindowedWithHeadersProcessor implements
Processor<String, String, Void, Void> {
+ private TimestampedWindowStoreWithHeaders<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore(WINDOW_STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ final long windowStart = record.timestamp() - (record.timestamp()
% WINDOW_SIZE_MS);
+ store.put(record.key(),
+ ValueTimestampHeaders.make(record.value(), record.timestamp(),
record.headers()),
+ windowStart);
+ }
+ }
+
+ @Test
+ public void
shouldFailDowngradeFromTimestampedWindowStoreWithHeadersToTimestampedWindowStore()
throws Exception {
+ final Properties props = props();
+ setupAndPopulateWindowStoreWithHeaders(props,
singletonList(KeyValue.pair("key1", 100L)));
+ kafkaStreams = null;
+
+ // Attempt to downgrade to non-headers window store
+ final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+ downgradedBuilder.addStateStore(
+ Stores.timestampedWindowStoreBuilder(
+ Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
+ Duration.ofMillis(RETENTION_MS),
+ Duration.ofMillis(WINDOW_SIZE_MS),
+ false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+
+ boolean exceptionThrown = false;
+ try {
+ kafkaStreams.start();
+ } catch (final Exception e) {
+ Throwable cause = e;
+ while (cause != null) {
+ if (cause instanceof ProcessorStateException &&
+ cause.getMessage() != null &&
+ cause.getMessage().contains("headers-aware") &&
+ cause.getMessage().contains("Downgrade")) {
+ exceptionThrown = true;
+ break;
+ }
+ cause = cause.getCause();
+ }
+
+ if (!exceptionThrown) {
+ throw new AssertionError("Expected ProcessorStateException
about downgrade not being supported, but got: " + e.getMessage(), e);
+ }
+ } finally {
+ kafkaStreams.close(Duration.ofSeconds(30L));
+ }
+
+ if (!exceptionThrown) {
+ throw new AssertionError("Expected ProcessorStateException to be
thrown when attempting to downgrade from headers-aware to non-headers window
store");
+ }
+ }
+
+ @Test
+ public void
shouldSuccessfullyDowngradeFromTimestampedWindowStoreWithHeadersAfterCleanup()
throws Exception {
+ final Properties props = props();
+ setupAndPopulateWindowStoreWithHeaders(props,
asList(KeyValue.pair("key1", 100L), KeyValue.pair("key2", 200L)));
+
+ kafkaStreams.cleanUp(); // Delete local state
+ kafkaStreams = null;
+
+ final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+ downgradedBuilder.addStateStore(
+ Stores.timestampedWindowStoreBuilder(
+ Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
+ Duration.ofMillis(RETENTION_MS),
+ Duration.ofMillis(WINDOW_SIZE_MS),
+ false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+ kafkaStreams.start();
+
+ final long newTime = CLUSTER.time.milliseconds();
+ processWindowedKeyValueAndVerifyTimestamped("key3", "value3", newTime
+ 300);
+ processWindowedKeyValueAndVerifyTimestamped("key4", "value4", newTime
+ 400);
+
+ kafkaStreams.close();
+ }
+
+ private boolean windowStoreContainsKey(final String key, final long
timestamp) {
+ try {
+ final ReadOnlyWindowStore<String, ValueTimestampHeaders<String>>
store =
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams,
QueryableStoreTypes.windowStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final long expectedWindowStart = timestamp - (timestamp %
WINDOW_SIZE_MS);
+ try (final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator = store.all()) {
+ while (iterator.hasNext()) {
+ final KeyValue<Windowed<String>,
ValueTimestampHeaders<String>> kv = iterator.next();
+ if (kv.key.key().equals(key) && kv.key.window().start() ==
expectedWindowStart) {
+ return true;
+ }
+ }
+ }
+ return false;
+ } catch (final Exception e) {
+ return false;
+ }
+ }
+
+ /**
+ * Setup and populate a window store with headers.
+ * @param props Streams properties
+ * @param records List of (key, timestampOffset) tuples. Values will be
generated as "value{N}"
+ * @return base time used for record timestamps
+ */
+ private long setupAndPopulateWindowStoreWithHeaders(final Properties props,
+ final
List<KeyValue<String, Long>> records) throws Exception {
+ final long baseTime = setupWindowStoreWithHeaders(props);
+
+ for (int i = 0; i < records.size(); i++) {
+ final KeyValue<String, Long> record = records.get(i);
+ final String value = "value" + (i + 1);
+ produceRecordWithHeaders(record.key, value, baseTime +
record.value);
+ }
+
+ // Wait for all records to be processed
+ TestUtils.waitForCondition(
+ () -> {
+ for (final KeyValue<String, Long> record : records) {
+ if (!windowStoreContainsKey(record.key, baseTime +
record.value)) {
+ return false;
+ }
+ }
+ return true;
+ },
+ 30_000L,
+ "Store was not populated with expected data"
+ );
+
+ kafkaStreams.close();
+ return baseTime;
+ }
+
+ private long setupWindowStoreWithHeaders(final Properties props) {
+ final StreamsBuilder headersBuilder = new StreamsBuilder();
+ headersBuilder.addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+
Stores.persistentTimestampedWindowStoreWithHeaders(WINDOW_STORE_NAME,
+ Duration.ofMillis(RETENTION_MS),
+ Duration.ofMillis(WINDOW_SIZE_MS),
+ false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedWithHeadersProcessor::new,
WINDOW_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(headersBuilder.build(), props);
+ kafkaStreams.start();
+
+ return CLUSTER.time.milliseconds();
+ }
+
+ private void produceRecordWithHeaders(final String key, final String
value, final long timestamp) throws Exception {
+ final Headers headers = new RecordHeaders();
+ headers.add("source", "test".getBytes());
+
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair(key, value)),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
StringSerializer.class, StringSerializer.class),
+ headers,
+ timestamp,
+ false);
+ }
}
\ No newline at end of file