mjsax commented on code in PR #21581:
URL: https://github.com/apache/kafka/pull/21581#discussion_r2876673340
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java:
##########
@@ -349,4 +363,464 @@ public void process(final Record<String, String> record) {
store.put(record.key(), ValueTimestampHeaders.make(record.value(),
record.timestamp(), record.headers()));
}
}
+
+ @Test
+ public void
shouldMigrateInMemoryTimestampedWindowStoreToTimestampedWindowStoreWithHeaders()
throws Exception {
+
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(false);
+ }
+
+ @Test
+ public void
shouldMigratePersistentTimestampedWindowStoreToTimestampedWindowStoreWithHeaders()
throws Exception {
+
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(true);
+ }
+
+ /**
+ * Tests migration from TimestampedWindowStore to
TimestampedWindowStoreWithHeaders.
+ * This is a true migration where both supplier and builder are upgraded.
+ */
+ private void
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(final
boolean persistentStore) throws Exception {
+ // Phase 1: Run with old TimestampedWindowStore
+ final StreamsBuilder oldBuilder = new StreamsBuilder();
+ oldBuilder.addStateStore(
+ Stores.timestampedWindowStoreBuilder(
+ persistentStore
+ ?
Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false)
+ : Stores.inMemoryWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+ kafkaStreams.start();
+
+ final long baseTime = CLUSTER.time.milliseconds();
+ processWindowedKeyValueAndVerifyTimestamped("key1", "value1", baseTime
+ 100);
+ processWindowedKeyValueAndVerifyTimestamped("key2", "value2", baseTime
+ 200);
+ processWindowedKeyValueAndVerifyTimestamped("key3", "value3", baseTime
+ 300);
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+ final StreamsBuilder newBuilder = new StreamsBuilder();
+ newBuilder.addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+ persistentStore
+ ?
Stores.persistentTimestampedWindowStoreWithHeaders(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false)
+ : Stores.inMemoryWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedWithHeadersProcessor::new,
WINDOW_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+ kafkaStreams.start();
+
+ verifyWindowValueWithEmptyHeaders("key1", "value1", baseTime + 100);
+ verifyWindowValueWithEmptyHeaders("key2", "value2", baseTime + 200);
+ verifyWindowValueWithEmptyHeaders("key3", "value3", baseTime + 300);
+
+ final Headers headers = new RecordHeaders();
+ headers.add("source", "migration-test".getBytes());
+ headers.add("version", "1.0".getBytes());
+
+ processWindowedKeyValueWithHeadersAndVerify("key3", "value3-updated",
baseTime + 350, headers, headers);
+ processWindowedKeyValueWithHeadersAndVerify("key4", "value4", baseTime
+ 400, headers, headers);
+
+ kafkaStreams.close();
+ }
+
+ @Test
+ public void
shouldProxyTimestampedWindowStoreToTimestampedWindowStoreWithHeaders() throws
Exception {
+ final StreamsBuilder oldBuilder = new StreamsBuilder();
+ oldBuilder.addStateStore(
+ Stores.timestampedWindowStoreBuilder(
+ Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+ kafkaStreams.start();
+
+ final long baseTime = CLUSTER.time.milliseconds();
+ processWindowedKeyValueAndVerifyTimestamped("key1", "value1", baseTime
+ 100);
+ processWindowedKeyValueAndVerifyTimestamped("key2", "value2", baseTime
+ 200);
+ processWindowedKeyValueAndVerifyTimestamped("key3", "value3", baseTime
+ 300);
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+ // Restart with headers-aware builder but non-headers supplier
(proxy/adapter mode)
+ final StreamsBuilder newBuilder = new StreamsBuilder();
+ newBuilder.addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+ Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false), //
non-headers supplier!
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedWithHeadersProcessor::new,
WINDOW_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+ kafkaStreams.start();
+
+ verifyWindowValueWithEmptyHeaders("key1", "value1", baseTime + 100);
+ verifyWindowValueWithEmptyHeaders("key2", "value2", baseTime + 200);
+ verifyWindowValueWithEmptyHeaders("key3", "value3", baseTime + 300);
+
+ final RecordHeaders headers = new RecordHeaders();
+ headers.add("source", "proxy-test".getBytes());
+
+ // In proxy mode, headers are stripped when writing to non-headers
store
+ // So we expect empty headers when reading back
+ final Headers expectedHeaders = new RecordHeaders();
+
+ processWindowedKeyValueWithHeadersAndVerify("key3", "value3-updated",
baseTime + 350, headers, expectedHeaders);
+ processWindowedKeyValueWithHeadersAndVerify("key4", "value4", baseTime
+ 400, headers, expectedHeaders);
+
+ kafkaStreams.close();
+ }
+
+ private void processWindowedKeyValueAndVerifyTimestamped(final String key,
+ final String
value,
+ final long
timestamp) throws Exception {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair(key, value)),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class),
+ timestamp,
+ false);
+
+ TestUtils.waitForCondition(() -> {
+ try {
+ final ReadOnlyWindowStore<String, ValueAndTimestamp<String>>
store =
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.timestampedWindowStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final long windowStart = timestamp - (timestamp %
WINDOW_SIZE_MS);
+ final ValueAndTimestamp<String> result = store.fetch(key,
windowStart);
+
+ return result != null
+ && result.value().equals(value)
+ && result.timestamp() == timestamp;
+ } catch (final Exception e) {
+ return false;
+ }
+ }, 60_000L, "Could not verify timestamped value in time.");
+ }
+
+ private void processWindowedKeyValueWithHeadersAndVerify(final String key,
+ final String
value,
+ final long
timestamp,
+ final Headers
headers,
+ final Headers
expectedHeaders) throws Exception {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair(key, value)),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class),
+ headers,
+ timestamp,
+ false);
+
+ TestUtils.waitForCondition(() -> {
+ try {
+ final ReadOnlyWindowStore<String,
ValueTimestampHeaders<String>> store =
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.windowStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final long windowStart = timestamp - (timestamp %
WINDOW_SIZE_MS);
+
+ final List<KeyValue<Windowed<String>,
ValueTimestampHeaders<String>>> results = new LinkedList<>();
+ try (final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator = store.all()) {
+ while (iterator.hasNext()) {
+ final KeyValue<Windowed<String>,
ValueTimestampHeaders<String>> kv = iterator.next();
+ if (kv.key.key().equals(key) &&
kv.key.window().start() == windowStart) {
+ results.add(kv);
+ }
+ }
+ }
+
+ if (results.isEmpty()) {
+ return false;
+ }
+
+ final ValueTimestampHeaders<String> result =
results.get(0).value;
+ return result != null
+ && result.value().equals(value)
+ && result.timestamp() == timestamp
+ && result.headers().equals(expectedHeaders);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }, 60_000L, "Could not verify windowed value with headers in time.");
+ }
+
+ private void verifyWindowValueWithEmptyHeaders(final String key,
+ final String value,
+ final long timestamp)
throws Exception {
+ TestUtils.waitForCondition(() -> {
+ try {
+ final ReadOnlyWindowStore<String,
ValueTimestampHeaders<String>> store =
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.windowStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final long windowStart = timestamp - (timestamp %
WINDOW_SIZE_MS);
+
+ final List<KeyValue<Windowed<String>,
ValueTimestampHeaders<String>>> results = new LinkedList<>();
+ try (final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator = store.all()) {
+ while (iterator.hasNext()) {
+ final KeyValue<Windowed<String>,
ValueTimestampHeaders<String>> kv = iterator.next();
+ if (kv.key.key().equals(key) &&
kv.key.window().start() == windowStart) {
+ results.add(kv);
+ }
+ }
+ }
+
+ if (results.isEmpty()) {
+ return false;
+ }
+
+ final ValueTimestampHeaders<String> result =
results.get(0).value;
+ assertNotNull(result, "Result should not be null");
+ assertEquals(value, result.value(), "Value should match");
+ assertEquals(timestamp, result.timestamp(), "Timestamp should
match");
+
+ // Verify headers exist but are empty (migrated from
timestamped store without headers)
+ assertNotNull(result.headers(), "Headers should not be null
for migrated data");
+ assertEquals(0, result.headers().toArray().length, "Headers
should be empty for migrated data");
+
+ return true;
+ } catch (final Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }, 60_000L, "Could not verify legacy value with empty headers in
time.");
+ }
+
+ /**
+ * Processor for TimestampedWindowStore (without headers).
+ */
+ private static class TimestampedWindowedProcessor implements
Processor<String, String, Void, Void> {
+ private TimestampedWindowStore<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore(WINDOW_STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ final long windowStart = record.timestamp() - (record.timestamp()
% WINDOW_SIZE_MS);
+ store.put(record.key(), ValueAndTimestamp.make(record.value(),
record.timestamp()), windowStart);
+ }
+ }
+
+ /**
+ * Processor for TimestampedWindowStoreWithHeaders (with headers).
+ */
+ private static class TimestampedWindowedWithHeadersProcessor implements
Processor<String, String, Void, Void> {
+ private TimestampedWindowStoreWithHeaders<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore(WINDOW_STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ final long windowStart = record.timestamp() - (record.timestamp()
% WINDOW_SIZE_MS);
+ store.put(record.key(),
+ ValueTimestampHeaders.make(record.value(), record.timestamp(),
record.headers()),
+ windowStart);
+ }
+ }
+
+ @Test
+ public void
shouldFailDowngradeFromTimestampedWindowStoreWithHeadersToTimestampedWindowStore()
throws Exception {
+ final StreamsBuilder headersBuilder = new StreamsBuilder();
+ headersBuilder.addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+
Stores.persistentTimestampedWindowStoreWithHeaders(WINDOW_STORE_NAME,
+ Duration.ofMillis(RETENTION_MS),
+ Duration.ofMillis(WINDOW_SIZE_MS),
+ false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedWithHeadersProcessor::new,
WINDOW_STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(headersBuilder.build(), props);
+ kafkaStreams.start();
+
+ final long baseTime = CLUSTER.time.milliseconds();
+ final Headers headers = new RecordHeaders();
+ headers.add("source", "test".getBytes());
+
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair("key1", "value1")),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
StringSerializer.class, StringSerializer.class),
+ headers,
+ baseTime + 100,
+ false);
+
+ // Wait for processing
+ Thread.sleep(3000);
Review Comment:
This scrams "I am flaky" -- can we do this differently?
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java:
##########
@@ -349,4 +363,464 @@ public void process(final Record<String, String> record) {
store.put(record.key(), ValueTimestampHeaders.make(record.value(),
record.timestamp(), record.headers()));
}
}
+
+ @Test
+ public void
shouldMigrateInMemoryTimestampedWindowStoreToTimestampedWindowStoreWithHeaders()
throws Exception {
+
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(false);
+ }
+
+ @Test
+ public void
shouldMigratePersistentTimestampedWindowStoreToTimestampedWindowStoreWithHeaders()
throws Exception {
+
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(true);
+ }
+
+ /**
+ * Tests migration from TimestampedWindowStore to
TimestampedWindowStoreWithHeaders.
+ * This is a true migration where both supplier and builder are upgraded.
+ */
+ private void
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(final
boolean persistentStore) throws Exception {
+ // Phase 1: Run with old TimestampedWindowStore
+ final StreamsBuilder oldBuilder = new StreamsBuilder();
+ oldBuilder.addStateStore(
+ Stores.timestampedWindowStoreBuilder(
+ persistentStore
+ ?
Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false)
+ : Stores.inMemoryWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+ kafkaStreams.start();
+
+ final long baseTime = CLUSTER.time.milliseconds();
+ processWindowedKeyValueAndVerifyTimestamped("key1", "value1", baseTime
+ 100);
+ processWindowedKeyValueAndVerifyTimestamped("key2", "value2", baseTime
+ 200);
+ processWindowedKeyValueAndVerifyTimestamped("key3", "value3", baseTime
+ 300);
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+ final StreamsBuilder newBuilder = new StreamsBuilder();
+ newBuilder.addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+ persistentStore
+ ?
Stores.persistentTimestampedWindowStoreWithHeaders(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false)
+ : Stores.inMemoryWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedWithHeadersProcessor::new,
WINDOW_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+ kafkaStreams.start();
+
+ verifyWindowValueWithEmptyHeaders("key1", "value1", baseTime + 100);
+ verifyWindowValueWithEmptyHeaders("key2", "value2", baseTime + 200);
+ verifyWindowValueWithEmptyHeaders("key3", "value3", baseTime + 300);
+
+ final Headers headers = new RecordHeaders();
+ headers.add("source", "migration-test".getBytes());
+ headers.add("version", "1.0".getBytes());
+
+ processWindowedKeyValueWithHeadersAndVerify("key3", "value3-updated",
baseTime + 350, headers, headers);
+ processWindowedKeyValueWithHeadersAndVerify("key4", "value4", baseTime
+ 400, headers, headers);
+
+ kafkaStreams.close();
+ }
+
+ @Test
+ public void
shouldProxyTimestampedWindowStoreToTimestampedWindowStoreWithHeaders() throws
Exception {
+ final StreamsBuilder oldBuilder = new StreamsBuilder();
+ oldBuilder.addStateStore(
+ Stores.timestampedWindowStoreBuilder(
+ Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+ kafkaStreams.start();
+
+ final long baseTime = CLUSTER.time.milliseconds();
+ processWindowedKeyValueAndVerifyTimestamped("key1", "value1", baseTime
+ 100);
+ processWindowedKeyValueAndVerifyTimestamped("key2", "value2", baseTime
+ 200);
+ processWindowedKeyValueAndVerifyTimestamped("key3", "value3", baseTime
+ 300);
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+ // Restart with headers-aware builder but non-headers supplier
(proxy/adapter mode)
+ final StreamsBuilder newBuilder = new StreamsBuilder();
+ newBuilder.addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+ Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false), //
non-headers supplier!
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedWithHeadersProcessor::new,
WINDOW_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+ kafkaStreams.start();
+
+ verifyWindowValueWithEmptyHeaders("key1", "value1", baseTime + 100);
+ verifyWindowValueWithEmptyHeaders("key2", "value2", baseTime + 200);
+ verifyWindowValueWithEmptyHeaders("key3", "value3", baseTime + 300);
+
+ final RecordHeaders headers = new RecordHeaders();
+ headers.add("source", "proxy-test".getBytes());
+
+ // In proxy mode, headers are stripped when writing to non-headers
store
+ // So we expect empty headers when reading back
+ final Headers expectedHeaders = new RecordHeaders();
+
+ processWindowedKeyValueWithHeadersAndVerify("key3", "value3-updated",
baseTime + 350, headers, expectedHeaders);
+ processWindowedKeyValueWithHeadersAndVerify("key4", "value4", baseTime
+ 400, headers, expectedHeaders);
+
+ kafkaStreams.close();
+ }
+
+ private void processWindowedKeyValueAndVerifyTimestamped(final String key,
+ final String
value,
+ final long
timestamp) throws Exception {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair(key, value)),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class),
+ timestamp,
+ false);
+
+ TestUtils.waitForCondition(() -> {
+ try {
+ final ReadOnlyWindowStore<String, ValueAndTimestamp<String>>
store =
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.timestampedWindowStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final long windowStart = timestamp - (timestamp %
WINDOW_SIZE_MS);
+ final ValueAndTimestamp<String> result = store.fetch(key,
windowStart);
+
+ return result != null
+ && result.value().equals(value)
+ && result.timestamp() == timestamp;
+ } catch (final Exception e) {
+ return false;
+ }
+ }, 60_000L, "Could not verify timestamped value in time.");
+ }
+
+ private void processWindowedKeyValueWithHeadersAndVerify(final String key,
+ final String
value,
+ final long
timestamp,
+ final Headers
headers,
+ final Headers
expectedHeaders) throws Exception {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair(key, value)),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class),
+ headers,
+ timestamp,
+ false);
+
+ TestUtils.waitForCondition(() -> {
+ try {
+ final ReadOnlyWindowStore<String,
ValueTimestampHeaders<String>> store =
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.windowStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final long windowStart = timestamp - (timestamp %
WINDOW_SIZE_MS);
+
+ final List<KeyValue<Windowed<String>,
ValueTimestampHeaders<String>>> results = new LinkedList<>();
+ try (final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator = store.all()) {
+ while (iterator.hasNext()) {
+ final KeyValue<Windowed<String>,
ValueTimestampHeaders<String>> kv = iterator.next();
+ if (kv.key.key().equals(key) &&
kv.key.window().start() == windowStart) {
+ results.add(kv);
+ }
+ }
+ }
+
+ if (results.isEmpty()) {
+ return false;
+ }
+
+ final ValueTimestampHeaders<String> result =
results.get(0).value;
+ return result != null
+ && result.value().equals(value)
+ && result.timestamp() == timestamp
+ && result.headers().equals(expectedHeaders);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }, 60_000L, "Could not verify windowed value with headers in time.");
+ }
+
+ private void verifyWindowValueWithEmptyHeaders(final String key,
+ final String value,
+ final long timestamp)
throws Exception {
+ TestUtils.waitForCondition(() -> {
+ try {
+ final ReadOnlyWindowStore<String,
ValueTimestampHeaders<String>> store =
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.windowStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final long windowStart = timestamp - (timestamp %
WINDOW_SIZE_MS);
+
+ final List<KeyValue<Windowed<String>,
ValueTimestampHeaders<String>>> results = new LinkedList<>();
+ try (final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator = store.all()) {
+ while (iterator.hasNext()) {
+ final KeyValue<Windowed<String>,
ValueTimestampHeaders<String>> kv = iterator.next();
+ if (kv.key.key().equals(key) &&
kv.key.window().start() == windowStart) {
+ results.add(kv);
+ }
+ }
+ }
+
+ if (results.isEmpty()) {
+ return false;
+ }
+
+ final ValueTimestampHeaders<String> result =
results.get(0).value;
+ assertNotNull(result, "Result should not be null");
+ assertEquals(value, result.value(), "Value should match");
+ assertEquals(timestamp, result.timestamp(), "Timestamp should
match");
+
+ // Verify headers exist but are empty (migrated from
timestamped store without headers)
+ assertNotNull(result.headers(), "Headers should not be null
for migrated data");
+ assertEquals(0, result.headers().toArray().length, "Headers
should be empty for migrated data");
+
+ return true;
+ } catch (final Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }, 60_000L, "Could not verify legacy value with empty headers in
time.");
+ }
+
+ /**
+ * Processor for TimestampedWindowStore (without headers).
+ */
+ private static class TimestampedWindowedProcessor implements
Processor<String, String, Void, Void> {
+ private TimestampedWindowStore<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore(WINDOW_STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ final long windowStart = record.timestamp() - (record.timestamp()
% WINDOW_SIZE_MS);
+ store.put(record.key(), ValueAndTimestamp.make(record.value(),
record.timestamp()), windowStart);
+ }
+ }
+
+ /**
+ * Processor for TimestampedWindowStoreWithHeaders (with headers).
+ */
+ private static class TimestampedWindowedWithHeadersProcessor implements
Processor<String, String, Void, Void> {
+ private TimestampedWindowStoreWithHeaders<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore(WINDOW_STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ final long windowStart = record.timestamp() - (record.timestamp()
% WINDOW_SIZE_MS);
+ store.put(record.key(),
+ ValueTimestampHeaders.make(record.value(), record.timestamp(),
record.headers()),
+ windowStart);
+ }
+ }
+
+ @Test
+ public void
shouldFailDowngradeFromTimestampedWindowStoreWithHeadersToTimestampedWindowStore()
throws Exception {
Review Comment:
Did you look into
`StoreUpgradIntegrationTest#shouldFailDowngradeFromTimestampedToRegularKeyValueStore`
Might be simpler to follow this pattern?
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java:
##########
@@ -349,4 +363,464 @@ public void process(final Record<String, String> record) {
store.put(record.key(), ValueTimestampHeaders.make(record.value(),
record.timestamp(), record.headers()));
}
}
+
+ @Test
+ public void
shouldMigrateInMemoryTimestampedWindowStoreToTimestampedWindowStoreWithHeaders()
throws Exception {
+
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(false);
+ }
+
+ @Test
+ public void
shouldMigratePersistentTimestampedWindowStoreToTimestampedWindowStoreWithHeaders()
throws Exception {
+
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(true);
+ }
+
+ /**
+ * Tests migration from TimestampedWindowStore to
TimestampedWindowStoreWithHeaders.
+ * This is a true migration where both supplier and builder are upgraded.
+ */
+ private void
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(final
boolean persistentStore) throws Exception {
+ // Phase 1: Run with old TimestampedWindowStore
+ final StreamsBuilder oldBuilder = new StreamsBuilder();
+ oldBuilder.addStateStore(
+ Stores.timestampedWindowStoreBuilder(
+ persistentStore
+ ?
Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false)
+ : Stores.inMemoryWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+ kafkaStreams.start();
+
+ final long baseTime = CLUSTER.time.milliseconds();
+ processWindowedKeyValueAndVerifyTimestamped("key1", "value1", baseTime
+ 100);
+ processWindowedKeyValueAndVerifyTimestamped("key2", "value2", baseTime
+ 200);
+ processWindowedKeyValueAndVerifyTimestamped("key3", "value3", baseTime
+ 300);
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+ final StreamsBuilder newBuilder = new StreamsBuilder();
+ newBuilder.addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+ persistentStore
+ ?
Stores.persistentTimestampedWindowStoreWithHeaders(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false)
+ : Stores.inMemoryWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedWithHeadersProcessor::new,
WINDOW_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+ kafkaStreams.start();
+
+ verifyWindowValueWithEmptyHeaders("key1", "value1", baseTime + 100);
+ verifyWindowValueWithEmptyHeaders("key2", "value2", baseTime + 200);
+ verifyWindowValueWithEmptyHeaders("key3", "value3", baseTime + 300);
+
+ final Headers headers = new RecordHeaders();
+ headers.add("source", "migration-test".getBytes());
+ headers.add("version", "1.0".getBytes());
+
+ processWindowedKeyValueWithHeadersAndVerify("key3", "value3-updated",
baseTime + 350, headers, headers);
+ processWindowedKeyValueWithHeadersAndVerify("key4", "value4", baseTime
+ 400, headers, headers);
+
+ kafkaStreams.close();
+ }
+
+ @Test
+ public void
shouldProxyTimestampedWindowStoreToTimestampedWindowStoreWithHeaders() throws
Exception {
+ final StreamsBuilder oldBuilder = new StreamsBuilder();
+ oldBuilder.addStateStore(
+ Stores.timestampedWindowStoreBuilder(
+ Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+ kafkaStreams.start();
+
+ final long baseTime = CLUSTER.time.milliseconds();
+ processWindowedKeyValueAndVerifyTimestamped("key1", "value1", baseTime
+ 100);
+ processWindowedKeyValueAndVerifyTimestamped("key2", "value2", baseTime
+ 200);
+ processWindowedKeyValueAndVerifyTimestamped("key3", "value3", baseTime
+ 300);
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+ // Restart with headers-aware builder but non-headers supplier
(proxy/adapter mode)
+ final StreamsBuilder newBuilder = new StreamsBuilder();
+ newBuilder.addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+ Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false), //
non-headers supplier!
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedWithHeadersProcessor::new,
WINDOW_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+ kafkaStreams.start();
+
+ verifyWindowValueWithEmptyHeaders("key1", "value1", baseTime + 100);
+ verifyWindowValueWithEmptyHeaders("key2", "value2", baseTime + 200);
+ verifyWindowValueWithEmptyHeaders("key3", "value3", baseTime + 300);
+
+ final RecordHeaders headers = new RecordHeaders();
+ headers.add("source", "proxy-test".getBytes());
+
+ // In proxy mode, headers are stripped when writing to non-headers
store
+ // So we expect empty headers when reading back
+ final Headers expectedHeaders = new RecordHeaders();
+
+ processWindowedKeyValueWithHeadersAndVerify("key3", "value3-updated",
baseTime + 350, headers, expectedHeaders);
+ processWindowedKeyValueWithHeadersAndVerify("key4", "value4", baseTime
+ 400, headers, expectedHeaders);
+
+ kafkaStreams.close();
+ }
+
+ private void processWindowedKeyValueAndVerifyTimestamped(final String key,
+ final String
value,
+ final long
timestamp) throws Exception {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair(key, value)),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class),
+ timestamp,
+ false);
+
+ TestUtils.waitForCondition(() -> {
+ try {
+ final ReadOnlyWindowStore<String, ValueAndTimestamp<String>>
store =
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.timestampedWindowStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final long windowStart = timestamp - (timestamp %
WINDOW_SIZE_MS);
+ final ValueAndTimestamp<String> result = store.fetch(key,
windowStart);
+
+ return result != null
+ && result.value().equals(value)
+ && result.timestamp() == timestamp;
+ } catch (final Exception e) {
+ return false;
+ }
+ }, 60_000L, "Could not verify timestamped value in time.");
+ }
+
+ private void processWindowedKeyValueWithHeadersAndVerify(final String key,
+ final String
value,
+ final long
timestamp,
+ final Headers
headers,
+ final Headers
expectedHeaders) throws Exception {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair(key, value)),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class),
+ headers,
+ timestamp,
+ false);
+
+ TestUtils.waitForCondition(() -> {
+ try {
+ final ReadOnlyWindowStore<String,
ValueTimestampHeaders<String>> store =
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.windowStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final long windowStart = timestamp - (timestamp %
WINDOW_SIZE_MS);
+
+ final List<KeyValue<Windowed<String>,
ValueTimestampHeaders<String>>> results = new LinkedList<>();
+ try (final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator = store.all()) {
+ while (iterator.hasNext()) {
+ final KeyValue<Windowed<String>,
ValueTimestampHeaders<String>> kv = iterator.next();
+ if (kv.key.key().equals(key) &&
kv.key.window().start() == windowStart) {
+ results.add(kv);
+ }
+ }
+ }
+
+ if (results.isEmpty()) {
+ return false;
+ }
+
+ final ValueTimestampHeaders<String> result =
results.get(0).value;
+ return result != null
+ && result.value().equals(value)
+ && result.timestamp() == timestamp
+ && result.headers().equals(expectedHeaders);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }, 60_000L, "Could not verify windowed value with headers in time.");
+ }
+
+ private void verifyWindowValueWithEmptyHeaders(final String key,
+ final String value,
+ final long timestamp)
throws Exception {
+ TestUtils.waitForCondition(() -> {
+ try {
+ final ReadOnlyWindowStore<String,
ValueTimestampHeaders<String>> store =
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.windowStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final long windowStart = timestamp - (timestamp %
WINDOW_SIZE_MS);
+
+ final List<KeyValue<Windowed<String>,
ValueTimestampHeaders<String>>> results = new LinkedList<>();
+ try (final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator = store.all()) {
+ while (iterator.hasNext()) {
+ final KeyValue<Windowed<String>,
ValueTimestampHeaders<String>> kv = iterator.next();
+ if (kv.key.key().equals(key) &&
kv.key.window().start() == windowStart) {
+ results.add(kv);
+ }
+ }
+ }
+
+ if (results.isEmpty()) {
+ return false;
+ }
+
+ final ValueTimestampHeaders<String> result =
results.get(0).value;
+ assertNotNull(result, "Result should not be null");
+ assertEquals(value, result.value(), "Value should match");
+ assertEquals(timestamp, result.timestamp(), "Timestamp should
match");
+
+ // Verify headers exist but are empty (migrated from
timestamped store without headers)
+ assertNotNull(result.headers(), "Headers should not be null
for migrated data");
+ assertEquals(0, result.headers().toArray().length, "Headers
should be empty for migrated data");
+
+ return true;
+ } catch (final Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }, 60_000L, "Could not verify legacy value with empty headers in
time.");
+ }
+
+ /**
+ * Processor for TimestampedWindowStore (without headers).
+ */
+ private static class TimestampedWindowedProcessor implements
Processor<String, String, Void, Void> {
+ private TimestampedWindowStore<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore(WINDOW_STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ final long windowStart = record.timestamp() - (record.timestamp()
% WINDOW_SIZE_MS);
+ store.put(record.key(), ValueAndTimestamp.make(record.value(),
record.timestamp()), windowStart);
+ }
+ }
+
+ /**
+ * Processor for TimestampedWindowStoreWithHeaders (with headers).
+ */
+ private static class TimestampedWindowedWithHeadersProcessor implements
Processor<String, String, Void, Void> {
+ private TimestampedWindowStoreWithHeaders<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore(WINDOW_STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ final long windowStart = record.timestamp() - (record.timestamp()
% WINDOW_SIZE_MS);
+ store.put(record.key(),
+ ValueTimestampHeaders.make(record.value(), record.timestamp(),
record.headers()),
+ windowStart);
+ }
+ }
+
+ @Test
+ public void
shouldFailDowngradeFromTimestampedWindowStoreWithHeadersToTimestampedWindowStore()
throws Exception {
+ final StreamsBuilder headersBuilder = new StreamsBuilder();
+ headersBuilder.addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+
Stores.persistentTimestampedWindowStoreWithHeaders(WINDOW_STORE_NAME,
+ Duration.ofMillis(RETENTION_MS),
+ Duration.ofMillis(WINDOW_SIZE_MS),
+ false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedWithHeadersProcessor::new,
WINDOW_STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(headersBuilder.build(), props);
+ kafkaStreams.start();
+
+ final long baseTime = CLUSTER.time.milliseconds();
+ final Headers headers = new RecordHeaders();
+ headers.add("source", "test".getBytes());
+
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair("key1", "value1")),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
StringSerializer.class, StringSerializer.class),
+ headers,
+ baseTime + 100,
+ false);
+
+ // Wait for processing
+ Thread.sleep(3000);
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+ final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+ downgradedBuilder.addStateStore(
+ Stores.timestampedWindowStoreBuilder(
+ Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
+ Duration.ofMillis(RETENTION_MS),
+ Duration.ofMillis(WINDOW_SIZE_MS),
+ false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+
+ // Note: Window stores use segment-level versioning, not column family
detection like key-value stores.
+ // Therefore, they do not throw an explicit "downgrade not supported"
exception during initialization.
+ // Instead, attempting to read headers-aware data with a
non-headers-aware store will result in
+ // data corruption (misinterpreting [headers][timestamp][value] as
[timestamp][value]).
+ //
+ // This test verifies that downgrade without cleanup leads to data
corruption or read failures.
+ boolean dataIsCorruptedOrUnreadable = false;
+
+ try {
+ kafkaStreams.start();
+ Thread.sleep(5000);
+
+ // Try to read the data that was written with headers
+ final ReadOnlyWindowStore<String, ValueAndTimestamp<String>> store
=
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams,
QueryableStoreTypes.timestampedWindowStore());
+
+ if (store == null) {
+ fail("Store should be available after startup");
+ }
+
+ final long windowStart = baseTime + 100 - ((baseTime + 100) %
WINDOW_SIZE_MS);
+ final ValueAndTimestamp<String> result = store.fetch("key1",
windowStart);
+
+ // The data should either:
+ // 1. Be null (unable to read due to format mismatch), or
+ // 2. Be corrupted (wrong value/timestamp due to misinterpreting
the format)
+ if (result == null) {
+ // Data is null - indicates read failure due to format
mismatch (expected)
+ dataIsCorruptedOrUnreadable = true;
+ } else {
+ // If we can read something, verify it's corrupted
+ final boolean isCorrupted = !result.value().equals("value1")
|| result.timestamp() != (baseTime + 100);
+ if (isCorrupted) {
+ // Data is corrupted as expected
+ dataIsCorruptedOrUnreadable = true;
+ } else {
+ // Data was read correctly - this should NOT happen
+ fail("Expected data corruption or read failure when
downgrading without cleanup, " +
+ "but data was read correctly (value=" + result.value()
+ ", timestamp=" + result.timestamp() + "). " +
+ "downgrades without cleanup should not succeed.");
+ }
+ }
+ } catch (final Exception e) {
+ // An exception during read is also acceptable - indicates format
mismatch caused a failure
+ dataIsCorruptedOrUnreadable = true;
+ } finally {
+ kafkaStreams.close(Duration.ofSeconds(30L));
+ }
+
+ // Verify that the downgrade resulted in data corruption or read
failure
+ if (!dataIsCorruptedOrUnreadable) {
+ fail("Downgrade without cleanup should result in data corruption
or read failure");
+ }
+ }
+
+ @Test
+ public void
shouldSuccessfullyDowngradeFromTimestampedWindowStoreWithHeadersAfterCleanup()
throws Exception {
+ final StreamsBuilder headersBuilder = new StreamsBuilder();
+ headersBuilder.addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+
Stores.persistentTimestampedWindowStoreWithHeaders(WINDOW_STORE_NAME,
+ Duration.ofMillis(RETENTION_MS),
+ Duration.ofMillis(WINDOW_SIZE_MS),
+ false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedWithHeadersProcessor::new,
WINDOW_STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(headersBuilder.build(), props);
+ kafkaStreams.start();
+
+ final long baseTime = CLUSTER.time.milliseconds();
+ final Headers headers = new RecordHeaders();
+ headers.add("source", "test".getBytes());
+
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair("key1", "value1")),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
StringSerializer.class, StringSerializer.class),
+ headers,
+ baseTime + 100,
+ false);
+
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair("key2", "value2")),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
StringSerializer.class, StringSerializer.class),
+ headers,
+ baseTime + 200,
+ false);
+
+ // Wait for processing
+ Thread.sleep(3000);
Review Comment:
We should **_never_** put `sleep()` into a test.
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java:
##########
@@ -349,4 +363,464 @@ public void process(final Record<String, String> record) {
store.put(record.key(), ValueTimestampHeaders.make(record.value(),
record.timestamp(), record.headers()));
}
}
+
+ @Test
+ public void
shouldMigrateInMemoryTimestampedWindowStoreToTimestampedWindowStoreWithHeaders()
throws Exception {
+
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(false);
+ }
+
+ @Test
+ public void
shouldMigratePersistentTimestampedWindowStoreToTimestampedWindowStoreWithHeaders()
throws Exception {
+
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(true);
+ }
+
+ /**
+ * Tests migration from TimestampedWindowStore to
TimestampedWindowStoreWithHeaders.
+ * This is a true migration where both supplier and builder are upgraded.
+ */
+ private void
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(final
boolean persistentStore) throws Exception {
+ // Phase 1: Run with old TimestampedWindowStore
+ final StreamsBuilder oldBuilder = new StreamsBuilder();
+ oldBuilder.addStateStore(
+ Stores.timestampedWindowStoreBuilder(
+ persistentStore
+ ?
Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false)
+ : Stores.inMemoryWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+ kafkaStreams.start();
+
+ final long baseTime = CLUSTER.time.milliseconds();
+ processWindowedKeyValueAndVerifyTimestamped("key1", "value1", baseTime
+ 100);
+ processWindowedKeyValueAndVerifyTimestamped("key2", "value2", baseTime
+ 200);
+ processWindowedKeyValueAndVerifyTimestamped("key3", "value3", baseTime
+ 300);
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+ final StreamsBuilder newBuilder = new StreamsBuilder();
+ newBuilder.addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+ persistentStore
+ ?
Stores.persistentTimestampedWindowStoreWithHeaders(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false)
+ : Stores.inMemoryWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedWithHeadersProcessor::new,
WINDOW_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+ kafkaStreams.start();
+
+ verifyWindowValueWithEmptyHeaders("key1", "value1", baseTime + 100);
+ verifyWindowValueWithEmptyHeaders("key2", "value2", baseTime + 200);
+ verifyWindowValueWithEmptyHeaders("key3", "value3", baseTime + 300);
+
+ final Headers headers = new RecordHeaders();
+ headers.add("source", "migration-test".getBytes());
+ headers.add("version", "1.0".getBytes());
+
+ processWindowedKeyValueWithHeadersAndVerify("key3", "value3-updated",
baseTime + 350, headers, headers);
+ processWindowedKeyValueWithHeadersAndVerify("key4", "value4", baseTime
+ 400, headers, headers);
+
+ kafkaStreams.close();
+ }
+
+ @Test
+ public void
shouldProxyTimestampedWindowStoreToTimestampedWindowStoreWithHeaders() throws
Exception {
+ final StreamsBuilder oldBuilder = new StreamsBuilder();
+ oldBuilder.addStateStore(
+ Stores.timestampedWindowStoreBuilder(
+ Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+ kafkaStreams.start();
+
+ final long baseTime = CLUSTER.time.milliseconds();
+ processWindowedKeyValueAndVerifyTimestamped("key1", "value1", baseTime
+ 100);
+ processWindowedKeyValueAndVerifyTimestamped("key2", "value2", baseTime
+ 200);
+ processWindowedKeyValueAndVerifyTimestamped("key3", "value3", baseTime
+ 300);
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+ // Restart with headers-aware builder but non-headers supplier
(proxy/adapter mode)
+ final StreamsBuilder newBuilder = new StreamsBuilder();
+ newBuilder.addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+ Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false), //
non-headers supplier!
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedWithHeadersProcessor::new,
WINDOW_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+ kafkaStreams.start();
+
+ verifyWindowValueWithEmptyHeaders("key1", "value1", baseTime + 100);
+ verifyWindowValueWithEmptyHeaders("key2", "value2", baseTime + 200);
+ verifyWindowValueWithEmptyHeaders("key3", "value3", baseTime + 300);
+
+ final RecordHeaders headers = new RecordHeaders();
+ headers.add("source", "proxy-test".getBytes());
+
+ // In proxy mode, headers are stripped when writing to non-headers
store
+ // So we expect empty headers when reading back
+ final Headers expectedHeaders = new RecordHeaders();
+
+ processWindowedKeyValueWithHeadersAndVerify("key3", "value3-updated",
baseTime + 350, headers, expectedHeaders);
+ processWindowedKeyValueWithHeadersAndVerify("key4", "value4", baseTime
+ 400, headers, expectedHeaders);
+
+ kafkaStreams.close();
+ }
+
+ private void processWindowedKeyValueAndVerifyTimestamped(final String key,
+ final String
value,
+ final long
timestamp) throws Exception {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair(key, value)),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class),
+ timestamp,
+ false);
+
+ TestUtils.waitForCondition(() -> {
+ try {
+ final ReadOnlyWindowStore<String, ValueAndTimestamp<String>>
store =
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.timestampedWindowStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final long windowStart = timestamp - (timestamp %
WINDOW_SIZE_MS);
+ final ValueAndTimestamp<String> result = store.fetch(key,
windowStart);
+
+ return result != null
+ && result.value().equals(value)
+ && result.timestamp() == timestamp;
+ } catch (final Exception e) {
+ return false;
+ }
+ }, 60_000L, "Could not verify timestamped value in time.");
+ }
+
+ private void processWindowedKeyValueWithHeadersAndVerify(final String key,
+ final String
value,
+ final long
timestamp,
+ final Headers
headers,
+ final Headers
expectedHeaders) throws Exception {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair(key, value)),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class),
+ headers,
+ timestamp,
+ false);
+
+ TestUtils.waitForCondition(() -> {
+ try {
+ final ReadOnlyWindowStore<String,
ValueTimestampHeaders<String>> store =
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.windowStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final long windowStart = timestamp - (timestamp %
WINDOW_SIZE_MS);
+
+ final List<KeyValue<Windowed<String>,
ValueTimestampHeaders<String>>> results = new LinkedList<>();
+ try (final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator = store.all()) {
+ while (iterator.hasNext()) {
+ final KeyValue<Windowed<String>,
ValueTimestampHeaders<String>> kv = iterator.next();
+ if (kv.key.key().equals(key) &&
kv.key.window().start() == windowStart) {
+ results.add(kv);
+ }
+ }
+ }
+
+ if (results.isEmpty()) {
+ return false;
+ }
+
+ final ValueTimestampHeaders<String> result =
results.get(0).value;
+ return result != null
+ && result.value().equals(value)
+ && result.timestamp() == timestamp
+ && result.headers().equals(expectedHeaders);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }, 60_000L, "Could not verify windowed value with headers in time.");
+ }
+
+ private void verifyWindowValueWithEmptyHeaders(final String key,
+ final String value,
+ final long timestamp)
throws Exception {
+ TestUtils.waitForCondition(() -> {
+ try {
+ final ReadOnlyWindowStore<String,
ValueTimestampHeaders<String>> store =
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.windowStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final long windowStart = timestamp - (timestamp %
WINDOW_SIZE_MS);
+
+ final List<KeyValue<Windowed<String>,
ValueTimestampHeaders<String>>> results = new LinkedList<>();
+ try (final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator = store.all()) {
+ while (iterator.hasNext()) {
+ final KeyValue<Windowed<String>,
ValueTimestampHeaders<String>> kv = iterator.next();
+ if (kv.key.key().equals(key) &&
kv.key.window().start() == windowStart) {
+ results.add(kv);
+ }
+ }
+ }
+
+ if (results.isEmpty()) {
+ return false;
+ }
+
+ final ValueTimestampHeaders<String> result =
results.get(0).value;
+ assertNotNull(result, "Result should not be null");
+ assertEquals(value, result.value(), "Value should match");
+ assertEquals(timestamp, result.timestamp(), "Timestamp should
match");
+
+ // Verify headers exist but are empty (migrated from
timestamped store without headers)
+ assertNotNull(result.headers(), "Headers should not be null
for migrated data");
+ assertEquals(0, result.headers().toArray().length, "Headers
should be empty for migrated data");
+
+ return true;
+ } catch (final Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }, 60_000L, "Could not verify legacy value with empty headers in
time.");
+ }
+
+ /**
+ * Processor for TimestampedWindowStore (without headers).
+ */
+ private static class TimestampedWindowedProcessor implements
Processor<String, String, Void, Void> {
+ private TimestampedWindowStore<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore(WINDOW_STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ final long windowStart = record.timestamp() - (record.timestamp()
% WINDOW_SIZE_MS);
+ store.put(record.key(), ValueAndTimestamp.make(record.value(),
record.timestamp()), windowStart);
+ }
+ }
+
+ /**
+ * Processor for TimestampedWindowStoreWithHeaders (with headers).
+ */
+ private static class TimestampedWindowedWithHeadersProcessor implements
Processor<String, String, Void, Void> {
+ private TimestampedWindowStoreWithHeaders<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore(WINDOW_STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ final long windowStart = record.timestamp() - (record.timestamp()
% WINDOW_SIZE_MS);
+ store.put(record.key(),
+ ValueTimestampHeaders.make(record.value(), record.timestamp(),
record.headers()),
+ windowStart);
+ }
+ }
+
+ @Test
+ public void
shouldFailDowngradeFromTimestampedWindowStoreWithHeadersToTimestampedWindowStore()
throws Exception {
+ final StreamsBuilder headersBuilder = new StreamsBuilder();
+ headersBuilder.addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+
Stores.persistentTimestampedWindowStoreWithHeaders(WINDOW_STORE_NAME,
+ Duration.ofMillis(RETENTION_MS),
+ Duration.ofMillis(WINDOW_SIZE_MS),
+ false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedWithHeadersProcessor::new,
WINDOW_STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(headersBuilder.build(), props);
+ kafkaStreams.start();
+
+ final long baseTime = CLUSTER.time.milliseconds();
+ final Headers headers = new RecordHeaders();
+ headers.add("source", "test".getBytes());
+
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair("key1", "value1")),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
StringSerializer.class, StringSerializer.class),
+ headers,
+ baseTime + 100,
+ false);
+
+ // Wait for processing
+ Thread.sleep(3000);
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+ final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+ downgradedBuilder.addStateStore(
+ Stores.timestampedWindowStoreBuilder(
+ Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
+ Duration.ofMillis(RETENTION_MS),
+ Duration.ofMillis(WINDOW_SIZE_MS),
+ false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+
+ // Note: Window stores use segment-level versioning, not column family
detection like key-value stores.
+ // Therefore, they do not throw an explicit "downgrade not supported"
exception during initialization.
+ // Instead, attempting to read headers-aware data with a
non-headers-aware store will result in
+ // data corruption (misinterpreting [headers][timestamp][value] as
[timestamp][value]).
+ //
+ // This test verifies that downgrade without cleanup leads to data
corruption or read failures.
+ boolean dataIsCorruptedOrUnreadable = false;
+
+ try {
+ kafkaStreams.start();
+ Thread.sleep(5000);
+
+ // Try to read the data that was written with headers
+ final ReadOnlyWindowStore<String, ValueAndTimestamp<String>> store
=
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams,
QueryableStoreTypes.timestampedWindowStore());
+
+ if (store == null) {
+ fail("Store should be available after startup");
+ }
+
+ final long windowStart = baseTime + 100 - ((baseTime + 100) %
WINDOW_SIZE_MS);
+ final ValueAndTimestamp<String> result = store.fetch("key1",
windowStart);
+
+ // The data should either:
+ // 1. Be null (unable to read due to format mismatch), or
+ // 2. Be corrupted (wrong value/timestamp due to misinterpreting
the format)
+ if (result == null) {
+ // Data is null - indicates read failure due to format
mismatch (expected)
+ dataIsCorruptedOrUnreadable = true;
+ } else {
+ // If we can read something, verify it's corrupted
+ final boolean isCorrupted = !result.value().equals("value1")
|| result.timestamp() != (baseTime + 100);
+ if (isCorrupted) {
+ // Data is corrupted as expected
+ dataIsCorruptedOrUnreadable = true;
+ } else {
+ // Data was read correctly - this should NOT happen
+ fail("Expected data corruption or read failure when
downgrading without cleanup, " +
+ "but data was read correctly (value=" + result.value()
+ ", timestamp=" + result.timestamp() + "). " +
+ "downgrades without cleanup should not succeed.");
+ }
+ }
+ } catch (final Exception e) {
+ // An exception during read is also acceptable - indicates format
mismatch caused a failure
+ dataIsCorruptedOrUnreadable = true;
+ } finally {
+ kafkaStreams.close(Duration.ofSeconds(30L));
+ }
+
+ // Verify that the downgrade resulted in data corruption or read
failure
+ if (!dataIsCorruptedOrUnreadable) {
+ fail("Downgrade without cleanup should result in data corruption
or read failure");
+ }
+ }
+
+ @Test
+ public void
shouldSuccessfullyDowngradeFromTimestampedWindowStoreWithHeadersAfterCleanup()
throws Exception {
Review Comment:
Same. Maybe compare `StoreUpgradeIntegrationTest` to simplify this code?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]