mjsax commented on code in PR #21581:
URL: https://github.com/apache/kafka/pull/21581#discussion_r2881562210


##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java:
##########
@@ -349,4 +363,464 @@ public void process(final Record<String, String> record) {
             store.put(record.key(), ValueTimestampHeaders.make(record.value(), 
record.timestamp(), record.headers()));
         }
     }
+
+    @Test
+    public void 
shouldMigrateInMemoryTimestampedWindowStoreToTimestampedWindowStoreWithHeaders()
 throws Exception {
+        
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(false);
+    }
+
+    @Test
+    public void 
shouldMigratePersistentTimestampedWindowStoreToTimestampedWindowStoreWithHeaders()
 throws Exception {
+        
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(true);
+    }
+
+    /**
+     * Tests migration from TimestampedWindowStore to 
TimestampedWindowStoreWithHeaders.
+     * This is a true migration where both supplier and builder are upgraded.
+     */
+    private void 
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(final 
boolean persistentStore) throws Exception {
+        // Phase 1: Run with old TimestampedWindowStore
+        final StreamsBuilder oldBuilder = new StreamsBuilder();
+        oldBuilder.addStateStore(
+                Stores.timestampedWindowStoreBuilder(
+                    persistentStore
+                        ? 
Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false)
+                        : Stores.inMemoryWindowStore(WINDOW_STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+        kafkaStreams.start();
+
+        final long baseTime = CLUSTER.time.milliseconds();
+        processWindowedKeyValueAndVerifyTimestamped("key1", "value1", baseTime 
+ 100);
+        processWindowedKeyValueAndVerifyTimestamped("key2", "value2", baseTime 
+ 200);
+        processWindowedKeyValueAndVerifyTimestamped("key3", "value3", baseTime 
+ 300);
+
+        kafkaStreams.close();
+        kafkaStreams = null;
+
+        final StreamsBuilder newBuilder = new StreamsBuilder();
+        newBuilder.addStateStore(
+                Stores.timestampedWindowStoreWithHeadersBuilder(
+                    persistentStore
+                        ? 
Stores.persistentTimestampedWindowStoreWithHeaders(WINDOW_STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false)
+                        : Stores.inMemoryWindowStore(WINDOW_STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedWindowedWithHeadersProcessor::new, 
WINDOW_STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+        kafkaStreams.start();
+
+        verifyWindowValueWithEmptyHeaders("key1", "value1", baseTime + 100);
+        verifyWindowValueWithEmptyHeaders("key2", "value2", baseTime + 200);
+        verifyWindowValueWithEmptyHeaders("key3", "value3", baseTime + 300);
+
+        final Headers headers = new RecordHeaders();
+        headers.add("source", "migration-test".getBytes());
+        headers.add("version", "1.0".getBytes());
+
+        processWindowedKeyValueWithHeadersAndVerify("key3", "value3-updated", 
baseTime + 350, headers, headers);
+        processWindowedKeyValueWithHeadersAndVerify("key4", "value4", baseTime 
+ 400, headers, headers);
+
+        kafkaStreams.close();
+    }
+
+    @Test
+    public void 
shouldProxyTimestampedWindowStoreToTimestampedWindowStoreWithHeaders() throws 
Exception {
+        final StreamsBuilder oldBuilder = new StreamsBuilder();
+        oldBuilder.addStateStore(
+                Stores.timestampedWindowStoreBuilder(
+                    Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+        kafkaStreams.start();
+
+        final long baseTime = CLUSTER.time.milliseconds();
+        processWindowedKeyValueAndVerifyTimestamped("key1", "value1", baseTime 
+ 100);
+        processWindowedKeyValueAndVerifyTimestamped("key2", "value2", baseTime 
+ 200);
+        processWindowedKeyValueAndVerifyTimestamped("key3", "value3", baseTime 
+ 300);
+
+        kafkaStreams.close();
+        kafkaStreams = null;
+
+        // Restart with headers-aware builder but non-headers supplier 
(proxy/adapter mode)
+        final StreamsBuilder newBuilder = new StreamsBuilder();
+        newBuilder.addStateStore(
+                Stores.timestampedWindowStoreWithHeadersBuilder(
+                    Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),  // 
non-headers supplier!
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedWindowedWithHeadersProcessor::new, 
WINDOW_STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+        kafkaStreams.start();
+
+        verifyWindowValueWithEmptyHeaders("key1", "value1", baseTime + 100);
+        verifyWindowValueWithEmptyHeaders("key2", "value2", baseTime + 200);
+        verifyWindowValueWithEmptyHeaders("key3", "value3", baseTime + 300);
+
+        final RecordHeaders headers = new RecordHeaders();
+        headers.add("source", "proxy-test".getBytes());
+
+        // In proxy mode, headers are stripped when writing to non-headers 
store
+        // So we expect empty headers when reading back
+        final Headers expectedHeaders = new RecordHeaders();
+
+        processWindowedKeyValueWithHeadersAndVerify("key3", "value3-updated", 
baseTime + 350, headers, expectedHeaders);
+        processWindowedKeyValueWithHeadersAndVerify("key4", "value4", baseTime 
+ 400, headers, expectedHeaders);
+
+        kafkaStreams.close();
+    }
+
+    private void processWindowedKeyValueAndVerifyTimestamped(final String key,
+                                                             final String 
value,
+                                                             final long 
timestamp) throws Exception {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputStream,
+            singletonList(KeyValue.pair(key, value)),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class),
+            timestamp,
+            false);
+
+        TestUtils.waitForCondition(() -> {
+            try {
+                final ReadOnlyWindowStore<String, ValueAndTimestamp<String>> 
store =
+                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, QueryableStoreTypes.timestampedWindowStore());
+
+                if (store == null) {
+                    return false;
+                }
+
+                final long windowStart = timestamp - (timestamp % 
WINDOW_SIZE_MS);
+                final ValueAndTimestamp<String> result = store.fetch(key, 
windowStart);
+
+                return result != null
+                    && result.value().equals(value)
+                    && result.timestamp() == timestamp;
+            } catch (final Exception e) {
+                return false;
+            }
+        }, 60_000L, "Could not verify timestamped value in time.");
+    }
+
+    private void processWindowedKeyValueWithHeadersAndVerify(final String key,
+                                                              final String 
value,
+                                                              final long 
timestamp,
+                                                              final Headers 
headers,
+                                                              final Headers 
expectedHeaders) throws Exception {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputStream,
+            singletonList(KeyValue.pair(key, value)),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class),
+            headers,
+            timestamp,
+            false);
+
+        TestUtils.waitForCondition(() -> {
+            try {
+                final ReadOnlyWindowStore<String, 
ValueTimestampHeaders<String>> store =
+                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, QueryableStoreTypes.windowStore());
+
+                if (store == null) {
+                    return false;
+                }
+
+                final long windowStart = timestamp - (timestamp % 
WINDOW_SIZE_MS);
+
+                final List<KeyValue<Windowed<String>, 
ValueTimestampHeaders<String>>> results = new LinkedList<>();
+                try (final KeyValueIterator<Windowed<String>, 
ValueTimestampHeaders<String>> iterator = store.all()) {
+                    while (iterator.hasNext()) {
+                        final KeyValue<Windowed<String>, 
ValueTimestampHeaders<String>> kv = iterator.next();
+                        if (kv.key.key().equals(key) && 
kv.key.window().start() == windowStart) {
+                            results.add(kv);
+                        }
+                    }
+                }
+
+                if (results.isEmpty()) {
+                    return false;
+                }
+
+                final ValueTimestampHeaders<String> result = 
results.get(0).value;
+                return result != null
+                    && result.value().equals(value)
+                    && result.timestamp() == timestamp
+                    && result.headers().equals(expectedHeaders);
+            } catch (final Exception e) {
+                e.printStackTrace();
+                return false;
+            }
+        }, 60_000L, "Could not verify windowed value with headers in time.");
+    }
+
+    private void verifyWindowValueWithEmptyHeaders(final String key,
+                                                    final String value,
+                                                    final long timestamp) 
throws Exception {
+        TestUtils.waitForCondition(() -> {
+            try {
+                final ReadOnlyWindowStore<String, 
ValueTimestampHeaders<String>> store =
+                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, QueryableStoreTypes.windowStore());
+
+                if (store == null) {
+                    return false;
+                }
+
+                final long windowStart = timestamp - (timestamp % 
WINDOW_SIZE_MS);
+
+                final List<KeyValue<Windowed<String>, 
ValueTimestampHeaders<String>>> results = new LinkedList<>();
+                try (final KeyValueIterator<Windowed<String>, 
ValueTimestampHeaders<String>> iterator = store.all()) {
+                    while (iterator.hasNext()) {
+                        final KeyValue<Windowed<String>, 
ValueTimestampHeaders<String>> kv = iterator.next();
+                        if (kv.key.key().equals(key) && 
kv.key.window().start() == windowStart) {
+                            results.add(kv);
+                        }
+                    }
+                }
+
+                if (results.isEmpty()) {
+                    return false;
+                }
+
+                final ValueTimestampHeaders<String> result = 
results.get(0).value;
+                assertNotNull(result, "Result should not be null");
+                assertEquals(value, result.value(), "Value should match");
+                assertEquals(timestamp, result.timestamp(), "Timestamp should 
match");
+
+                // Verify headers exist but are empty (migrated from 
timestamped store without headers)
+                assertNotNull(result.headers(), "Headers should not be null 
for migrated data");
+                assertEquals(0, result.headers().toArray().length, "Headers 
should be empty for migrated data");
+
+                return true;
+            } catch (final Exception e) {
+                e.printStackTrace();
+                return false;
+            }
+        }, 60_000L, "Could not verify legacy value with empty headers in 
time.");
+    }
+
+    /**
+     * Processor for TimestampedWindowStore (without headers).
+     */
+    private static class TimestampedWindowedProcessor implements 
Processor<String, String, Void, Void> {
+        private TimestampedWindowStore<String, String> store;
+
+        @Override
+        public void init(final ProcessorContext<Void, Void> context) {
+            store = context.getStateStore(WINDOW_STORE_NAME);
+        }
+
+        @Override
+        public void process(final Record<String, String> record) {
+            final long windowStart = record.timestamp() - (record.timestamp() 
% WINDOW_SIZE_MS);
+            store.put(record.key(), ValueAndTimestamp.make(record.value(), 
record.timestamp()), windowStart);
+        }
+    }
+
+    /**
+     * Processor for TimestampedWindowStoreWithHeaders (with headers).
+     */
+    private static class TimestampedWindowedWithHeadersProcessor implements 
Processor<String, String, Void, Void> {
+        private TimestampedWindowStoreWithHeaders<String, String> store;
+
+        @Override
+        public void init(final ProcessorContext<Void, Void> context) {
+            store = context.getStateStore(WINDOW_STORE_NAME);
+        }
+
+        @Override
+        public void process(final Record<String, String> record) {
+            final long windowStart = record.timestamp() - (record.timestamp() 
% WINDOW_SIZE_MS);
+            store.put(record.key(),
+                ValueTimestampHeaders.make(record.value(), record.timestamp(), 
record.headers()),
+                windowStart);
+        }
+    }
+
+    @Test
+    public void 
shouldFailDowngradeFromTimestampedWindowStoreWithHeadersToTimestampedWindowStore()
 throws Exception {

Review Comment:
   Did some digging and it's like this since 2016, when this code was added... 
(https://github.com/apache/kafka/pull/2166 -> add `Segments.java` -> was later 
refactored and the code was move into `AbstractSegments.java`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to