mjsax commented on code in PR #21581:
URL: https://github.com/apache/kafka/pull/21581#discussion_r2881562210
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java:
##########
@@ -349,4 +363,464 @@ public void process(final Record<String, String> record) {
store.put(record.key(), ValueTimestampHeaders.make(record.value(),
record.timestamp(), record.headers()));
}
}
+
+ @Test
+ public void
shouldMigrateInMemoryTimestampedWindowStoreToTimestampedWindowStoreWithHeaders()
throws Exception {
+
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(false);
+ }
+
+ @Test
+ public void
shouldMigratePersistentTimestampedWindowStoreToTimestampedWindowStoreWithHeaders()
throws Exception {
+
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(true);
+ }
+
+ /**
+ * Tests migration from TimestampedWindowStore to
TimestampedWindowStoreWithHeaders.
+ * This is a true migration where both supplier and builder are upgraded.
+ */
+ private void
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(final
boolean persistentStore) throws Exception {
+ // Phase 1: Run with old TimestampedWindowStore
+ final StreamsBuilder oldBuilder = new StreamsBuilder();
+ oldBuilder.addStateStore(
+ Stores.timestampedWindowStoreBuilder(
+ persistentStore
+ ?
Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false)
+ : Stores.inMemoryWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+ kafkaStreams.start();
+
+ final long baseTime = CLUSTER.time.milliseconds();
+ processWindowedKeyValueAndVerifyTimestamped("key1", "value1", baseTime
+ 100);
+ processWindowedKeyValueAndVerifyTimestamped("key2", "value2", baseTime
+ 200);
+ processWindowedKeyValueAndVerifyTimestamped("key3", "value3", baseTime
+ 300);
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+ final StreamsBuilder newBuilder = new StreamsBuilder();
+ newBuilder.addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+ persistentStore
+ ?
Stores.persistentTimestampedWindowStoreWithHeaders(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false)
+ : Stores.inMemoryWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedWithHeadersProcessor::new,
WINDOW_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+ kafkaStreams.start();
+
+ verifyWindowValueWithEmptyHeaders("key1", "value1", baseTime + 100);
+ verifyWindowValueWithEmptyHeaders("key2", "value2", baseTime + 200);
+ verifyWindowValueWithEmptyHeaders("key3", "value3", baseTime + 300);
+
+ final Headers headers = new RecordHeaders();
+ headers.add("source", "migration-test".getBytes());
+ headers.add("version", "1.0".getBytes());
+
+ processWindowedKeyValueWithHeadersAndVerify("key3", "value3-updated",
baseTime + 350, headers, headers);
+ processWindowedKeyValueWithHeadersAndVerify("key4", "value4", baseTime
+ 400, headers, headers);
+
+ kafkaStreams.close();
+ }
+
+ @Test
+ public void
shouldProxyTimestampedWindowStoreToTimestampedWindowStoreWithHeaders() throws
Exception {
+ final StreamsBuilder oldBuilder = new StreamsBuilder();
+ oldBuilder.addStateStore(
+ Stores.timestampedWindowStoreBuilder(
+ Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+ kafkaStreams.start();
+
+ final long baseTime = CLUSTER.time.milliseconds();
+ processWindowedKeyValueAndVerifyTimestamped("key1", "value1", baseTime
+ 100);
+ processWindowedKeyValueAndVerifyTimestamped("key2", "value2", baseTime
+ 200);
+ processWindowedKeyValueAndVerifyTimestamped("key3", "value3", baseTime
+ 300);
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+ // Restart with headers-aware builder but non-headers supplier
(proxy/adapter mode)
+ final StreamsBuilder newBuilder = new StreamsBuilder();
+ newBuilder.addStateStore(
+ Stores.timestampedWindowStoreWithHeadersBuilder(
+ Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false), //
non-headers supplier!
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedWindowedWithHeadersProcessor::new,
WINDOW_STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+ kafkaStreams.start();
+
+ verifyWindowValueWithEmptyHeaders("key1", "value1", baseTime + 100);
+ verifyWindowValueWithEmptyHeaders("key2", "value2", baseTime + 200);
+ verifyWindowValueWithEmptyHeaders("key3", "value3", baseTime + 300);
+
+ final RecordHeaders headers = new RecordHeaders();
+ headers.add("source", "proxy-test".getBytes());
+
+ // In proxy mode, headers are stripped when writing to non-headers
store
+ // So we expect empty headers when reading back
+ final Headers expectedHeaders = new RecordHeaders();
+
+ processWindowedKeyValueWithHeadersAndVerify("key3", "value3-updated",
baseTime + 350, headers, expectedHeaders);
+ processWindowedKeyValueWithHeadersAndVerify("key4", "value4", baseTime
+ 400, headers, expectedHeaders);
+
+ kafkaStreams.close();
+ }
+
+ private void processWindowedKeyValueAndVerifyTimestamped(final String key,
+ final String
value,
+ final long
timestamp) throws Exception {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair(key, value)),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class),
+ timestamp,
+ false);
+
+ TestUtils.waitForCondition(() -> {
+ try {
+ final ReadOnlyWindowStore<String, ValueAndTimestamp<String>>
store =
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.timestampedWindowStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final long windowStart = timestamp - (timestamp %
WINDOW_SIZE_MS);
+ final ValueAndTimestamp<String> result = store.fetch(key,
windowStart);
+
+ return result != null
+ && result.value().equals(value)
+ && result.timestamp() == timestamp;
+ } catch (final Exception e) {
+ return false;
+ }
+ }, 60_000L, "Could not verify timestamped value in time.");
+ }
+
+ private void processWindowedKeyValueWithHeadersAndVerify(final String key,
+ final String
value,
+ final long
timestamp,
+ final Headers
headers,
+ final Headers
expectedHeaders) throws Exception {
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair(key, value)),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class),
+ headers,
+ timestamp,
+ false);
+
+ TestUtils.waitForCondition(() -> {
+ try {
+ final ReadOnlyWindowStore<String,
ValueTimestampHeaders<String>> store =
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.windowStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final long windowStart = timestamp - (timestamp %
WINDOW_SIZE_MS);
+
+ final List<KeyValue<Windowed<String>,
ValueTimestampHeaders<String>>> results = new LinkedList<>();
+ try (final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator = store.all()) {
+ while (iterator.hasNext()) {
+ final KeyValue<Windowed<String>,
ValueTimestampHeaders<String>> kv = iterator.next();
+ if (kv.key.key().equals(key) &&
kv.key.window().start() == windowStart) {
+ results.add(kv);
+ }
+ }
+ }
+
+ if (results.isEmpty()) {
+ return false;
+ }
+
+ final ValueTimestampHeaders<String> result =
results.get(0).value;
+ return result != null
+ && result.value().equals(value)
+ && result.timestamp() == timestamp
+ && result.headers().equals(expectedHeaders);
+ } catch (final Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }, 60_000L, "Could not verify windowed value with headers in time.");
+ }
+
+ private void verifyWindowValueWithEmptyHeaders(final String key,
+ final String value,
+ final long timestamp)
throws Exception {
+ TestUtils.waitForCondition(() -> {
+ try {
+ final ReadOnlyWindowStore<String,
ValueTimestampHeaders<String>> store =
+ IntegrationTestUtils.getStore(WINDOW_STORE_NAME,
kafkaStreams, QueryableStoreTypes.windowStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final long windowStart = timestamp - (timestamp %
WINDOW_SIZE_MS);
+
+ final List<KeyValue<Windowed<String>,
ValueTimestampHeaders<String>>> results = new LinkedList<>();
+ try (final KeyValueIterator<Windowed<String>,
ValueTimestampHeaders<String>> iterator = store.all()) {
+ while (iterator.hasNext()) {
+ final KeyValue<Windowed<String>,
ValueTimestampHeaders<String>> kv = iterator.next();
+ if (kv.key.key().equals(key) &&
kv.key.window().start() == windowStart) {
+ results.add(kv);
+ }
+ }
+ }
+
+ if (results.isEmpty()) {
+ return false;
+ }
+
+ final ValueTimestampHeaders<String> result =
results.get(0).value;
+ assertNotNull(result, "Result should not be null");
+ assertEquals(value, result.value(), "Value should match");
+ assertEquals(timestamp, result.timestamp(), "Timestamp should
match");
+
+ // Verify headers exist but are empty (migrated from
timestamped store without headers)
+ assertNotNull(result.headers(), "Headers should not be null
for migrated data");
+ assertEquals(0, result.headers().toArray().length, "Headers
should be empty for migrated data");
+
+ return true;
+ } catch (final Exception e) {
+ e.printStackTrace();
+ return false;
+ }
+ }, 60_000L, "Could not verify legacy value with empty headers in
time.");
+ }
+
+ /**
+ * Processor for TimestampedWindowStore (without headers).
+ */
+ private static class TimestampedWindowedProcessor implements
Processor<String, String, Void, Void> {
+ private TimestampedWindowStore<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore(WINDOW_STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ final long windowStart = record.timestamp() - (record.timestamp()
% WINDOW_SIZE_MS);
+ store.put(record.key(), ValueAndTimestamp.make(record.value(),
record.timestamp()), windowStart);
+ }
+ }
+
+ /**
+ * Processor for TimestampedWindowStoreWithHeaders (with headers).
+ */
+ private static class TimestampedWindowedWithHeadersProcessor implements
Processor<String, String, Void, Void> {
+ private TimestampedWindowStoreWithHeaders<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore(WINDOW_STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ final long windowStart = record.timestamp() - (record.timestamp()
% WINDOW_SIZE_MS);
+ store.put(record.key(),
+ ValueTimestampHeaders.make(record.value(), record.timestamp(),
record.headers()),
+ windowStart);
+ }
+ }
+
+ @Test
+ public void
shouldFailDowngradeFromTimestampedWindowStoreWithHeadersToTimestampedWindowStore()
throws Exception {
Review Comment:
Did some digging and it's like this since 2016, when this code was added...
(https://github.com/apache/kafka/pull/2166 -> add `Segments.java` -> was later
refactored and the code was move into `AbstractSegments.java`)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]