mjsax commented on code in PR #21581:
URL: https://github.com/apache/kafka/pull/21581#discussion_r2881571041
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java:
##########
@@ -349,4 +363,456 @@ public void process(final Record<String, String> record) {
store.put(record.key(), ValueTimestampHeaders.make(record.value(),
record.timestamp(), record.headers()));
}
}
+
+ @Test
+ public void
shouldMigrateInMemoryTimestampedWindowStoreToTimestampedWindowStoreWithHeaders()
throws Exception {
+
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(false);
+ }
+
+ @Test
+ public void
shouldMigratePersistentTimestampedWindowStoreToTimestampedWindowStoreWithHeaders()
throws Exception {
+
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(true);
+ }
+
+ /**
+ * Tests migration from TimestampedWindowStore to
TimestampedWindowStoreWithHeaders.
+ * This is a true migration where both supplier and builder are upgraded.
+ */
+ private void
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(final
boolean persistentStore) throws Exception {
+ // Phase 1: Run with old TimestampedWindowStore
+ final StreamsBuilder oldBuilder = new StreamsBuilder();
+ oldBuilder.addStateStore(
+ Stores.timestampedWindowStoreBuilder(
+ persistentStore
+ ?
Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false)
+ : Stores.inMemoryWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+ kafkaStreams.start();
+
+ final long baseTime = CLUSTER.time.milliseconds();
+ processWindowedKeyValueAndVerifyTimestamped("key1", "value1", baseTime
+ 100);
+ processWindowedKeyValueAndVerifyTimestamped("key2", "value2", baseTime
+ 200);
+ processWindowedKeyValueAndVerifyTimestamped("key3", "value3", baseTime
+ 300);
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+ final StreamsBuilder newBuilder = new StreamsBuilder();
+ newBuilder.addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+ persistentStore
+ ?
Stores.persistentTimestampedWindowStoreWithHeaders(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false)
+ : Stores.inMemoryWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedWithHeadersProcessor::new,
WINDOW_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+ kafkaStreams.start();
+
+ verifyWindowValueWithEmptyHeaders("key1", "value1", baseTime + 100);
+ verifyWindowValueWithEmptyHeaders("key2", "value2", baseTime + 200);
+ verifyWindowValueWithEmptyHeaders("key3", "value3", baseTime + 300);
+
+ final Headers headers = new RecordHeaders();
+ headers.add("source", "migration-test".getBytes());
+ headers.add("version", "1.0".getBytes());
+
+ processWindowedKeyValueWithHeadersAndVerify("key3", "value3-updated",
baseTime + 350, headers, headers);
+ processWindowedKeyValueWithHeadersAndVerify("key4", "value4", baseTime
+ 400, headers, headers);
+
+ kafkaStreams.close();
+ }
+
+ @Test
+ public void
shouldProxyTimestampedWindowStoreToTimestampedWindowStoreWithHeaders() throws
Exception {
+ final StreamsBuilder oldBuilder = new StreamsBuilder();
+ oldBuilder.addStateStore(
+ Stores.timestampedWindowStoreBuilder(
+ Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+ kafkaStreams.start();
+
+ final long baseTime = CLUSTER.time.milliseconds();
+ processWindowedKeyValueAndVerifyTimestamped("key1", "value1", baseTime
+ 100);
+ processWindowedKeyValueAndVerifyTimestamped("key2", "value2", baseTime
+ 200);
+ processWindowedKeyValueAndVerifyTimestamped("key3", "value3", baseTime
+ 300);
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+ // Restart with headers-aware builder but non-headers supplier
(proxy/adapter mode)
+ final StreamsBuilder newBuilder = new StreamsBuilder();
+ newBuilder.addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+ Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false), //
non-headers supplier!
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedWithHeadersProcessor::new,
WINDOW_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+ kafkaStreams.start();
+
+ verifyWindowValueWithEmptyHeaders("key1", "value1", baseTime + 100);
+ verifyWindowValueWithEmptyHeaders("key2", "value2", baseTime + 200);
+ verifyWindowValueWithEmptyHeaders("key3", "value3", baseTime + 300);
+
+ final RecordHeaders headers = new RecordHeaders();
+ headers.add("source", "proxy-test".getBytes());
+
+ // In proxy mode, headers are stripped when writing to non-headers
store
+ // So we expect empty headers when reading back
+ final Headers expectedHeaders = new RecordHeaders();
+
+ processWindowedKeyValueWithHeadersAndVerify("key3", "value3-updated",
baseTime + 350, headers, expectedHeaders);
+ processWindowedKeyValueWithHeadersAndVerify("key4", "value4", baseTime
+ 400, headers, expectedHeaders);
+
+ kafkaStreams.close();
+ }
+
+ private void processWindowedKeyValueAndVerifyTimestamped(final String key,
+ final String
value,
+ final long
timestamp) throws Exception {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair(key, value)),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class),
+ timestamp,
+ false);
+
+ TestUtils.waitForCondition(() -> {
+ try {
+ final ReadOnlyWindowStore<String, ValueAndTimestamp<String>>
store =
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.timestampedWindowStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final long windowStart = timestamp - (timestamp %
WINDOW_SIZE_MS);
+ final ValueAndTimestamp<String> result = store.fetch(key,
windowStart);
+
+ return result != null
+ && result.value().equals(value)
+ && result.timestamp() == timestamp;
+ } catch (final Exception e) {
+ return false;
+ }
+ }, 60_000L, "Could not verify timestamped value in time.");
+ }
+
+ private void processWindowedKeyValueWithHeadersAndVerify(final String key,
+ final String
value,
+ final long
timestamp,
+ final Headers
headers,
+ final Headers
expectedHeaders) throws Exception {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair(key, value)),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class),
+ headers,
+ timestamp,
+ false);
+
+ TestUtils.waitForCondition(() -> {
+ try {
+ final ReadOnlyWindowStore<String,
ValueTimestampHeaders<String>> store =
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.windowStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final long windowStart = timestamp - (timestamp %
WINDOW_SIZE_MS);
+
+ final List<KeyValue<Windowed<String>,
ValueTimestampHeaders<String>>> results = new LinkedList<>();
+ try (final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator = store.all()) {
+ while (iterator.hasNext()) {
+ final KeyValue<Windowed<String>,
ValueTimestampHeaders<String>> kv = iterator.next();
+ if (kv.key.key().equals(key) &&
kv.key.window().start() == windowStart) {
+ results.add(kv);
+ }
+ }
+ }
+
+ if (results.isEmpty()) {
+ return false;
+ }
+
+ final ValueTimestampHeaders<String> result =
results.get(0).value;
+ return result != null
+ && result.value().equals(value)
+ && result.timestamp() == timestamp
+ && result.headers().equals(expectedHeaders);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }, 60_000L, "Could not verify windowed value with headers in time.");
+ }
+
+ private void verifyWindowValueWithEmptyHeaders(final String key,
+ final String value,
+ final long timestamp)
throws Exception {
+ TestUtils.waitForCondition(() -> {
+ try {
+ final ReadOnlyWindowStore<String,
ValueTimestampHeaders<String>> store =
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.windowStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final long windowStart = timestamp - (timestamp %
WINDOW_SIZE_MS);
+
+ final List<KeyValue<Windowed<String>,
ValueTimestampHeaders<String>>> results = new LinkedList<>();
+ try (final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator = store.all()) {
+ while (iterator.hasNext()) {
+ final KeyValue<Windowed<String>,
ValueTimestampHeaders<String>> kv = iterator.next();
+ if (kv.key.key().equals(key) &&
kv.key.window().start() == windowStart) {
+ results.add(kv);
+ }
+ }
+ }
+
+ if (results.isEmpty()) {
+ return false;
+ }
+
+ final ValueTimestampHeaders<String> result =
results.get(0).value;
+ assertNotNull(result, "Result should not be null");
+ assertEquals(value, result.value(), "Value should match");
+ assertEquals(timestamp, result.timestamp(), "Timestamp should
match");
+
+ // Verify headers exist but are empty (migrated from
timestamped store without headers)
+ assertNotNull(result.headers(), "Headers should not be null
for migrated data");
+ assertEquals(0, result.headers().toArray().length, "Headers
should be empty for migrated data");
+
+ return true;
+ } catch (final Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }, 60_000L, "Could not verify legacy value with empty headers in
time.");
+ }
+
+ /**
+ * Processor for TimestampedWindowStore (without headers).
+ */
+ private static class TimestampedWindowedProcessor implements
Processor<String, String, Void, Void> {
+ private TimestampedWindowStore<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore(WINDOW_STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ final long windowStart = record.timestamp() - (record.timestamp()
% WINDOW_SIZE_MS);
+ store.put(record.key(), ValueAndTimestamp.make(record.value(),
record.timestamp()), windowStart);
+ }
+ }
+
+ /**
+ * Processor for TimestampedWindowStoreWithHeaders (with headers).
+ */
+ private static class TimestampedWindowedWithHeadersProcessor implements
Processor<String, String, Void, Void> {
+ private TimestampedWindowStoreWithHeaders<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore(WINDOW_STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ final long windowStart = record.timestamp() - (record.timestamp()
% WINDOW_SIZE_MS);
+ store.put(record.key(),
+ ValueTimestampHeaders.make(record.value(), record.timestamp(),
record.headers()),
+ windowStart);
+ }
+ }
+
+ @Test
+ public void
shouldFailDowngradeFromTimestampedWindowStoreWithHeadersToTimestampedWindowStore()
throws Exception {
+ final Properties props = props();
+ final long baseTime = setupAndPopulateWindowStoreWithHeaders(props,
singletonList(KeyValue.pair("key1", 100L)));
+ kafkaStreams = null;
+
+ // Attempt to downgrade to non-headers window store
+ final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+ downgradedBuilder.addStateStore(
+ Stores.timestampedWindowStoreBuilder(
+ Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
+ Duration.ofMillis(RETENTION_MS),
+ Duration.ofMillis(WINDOW_SIZE_MS),
+ false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+
+ try {
+ kafkaStreams.start();
+
+ final ReadOnlyWindowStore<String, ValueAndTimestamp<String>> store
=
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams,
QueryableStoreTypes.timestampedWindowStore());
+
+ final long windowStart = baseTime + 100 - ((baseTime + 100) %
WINDOW_SIZE_MS);
+ final ValueAndTimestamp<String> result = store.fetch("key1",
windowStart);
+
+ // If we can read the data correctly, the test should fail
+ if (result != null && result.value().equals("value1") &&
result.timestamp() == (baseTime + 100)) {
+ throw new AssertionError("Expected data corruption or read
failure when downgrading without cleanup, " +
+ "but data was read correctly (value=" + result.value() +
", timestamp=" + result.timestamp() + "). " +
+ "Downgrades without cleanup should not succeed.");
+ }
+ // Otherwise (null or corrupted data), the downgrade failed as
expected
Review Comment:
We should never reach this part right? We either crash with an exception, or
we throw the assertion-error?
So we should also fail the test here explicitly, to avoid what we pass
incorrectly
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]