mjsax commented on code in PR #21581:
URL: https://github.com/apache/kafka/pull/21581#discussion_r2881571041


##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java:
##########
@@ -349,4 +363,456 @@ public void process(final Record<String, String> record) {
             store.put(record.key(), ValueTimestampHeaders.make(record.value(), 
record.timestamp(), record.headers()));
         }
     }
+
+    @Test
+    public void 
shouldMigrateInMemoryTimestampedWindowStoreToTimestampedWindowStoreWithHeaders()
 throws Exception {
+        
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(false);
+    }
+
+    @Test
+    public void 
shouldMigratePersistentTimestampedWindowStoreToTimestampedWindowStoreWithHeaders()
 throws Exception {
+        
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(true);
+    }
+
+    /**
+     * Tests migration from TimestampedWindowStore to 
TimestampedWindowStoreWithHeaders.
+     * This is a true migration where both supplier and builder are upgraded.
+     */
+    private void 
shouldMigrateTimestampedWindowStoreToTimestampedWindowStoreWithHeaders(final 
boolean persistentStore) throws Exception {
+        // Phase 1: Run with old TimestampedWindowStore
+        final StreamsBuilder oldBuilder = new StreamsBuilder();
+        oldBuilder.addStateStore(
+                Stores.timestampedWindowStoreBuilder(
+                    persistentStore
+                        ? 
Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false)
+                        : Stores.inMemoryWindowStore(WINDOW_STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+        kafkaStreams.start();
+
+        final long baseTime = CLUSTER.time.milliseconds();
+        processWindowedKeyValueAndVerifyTimestamped("key1", "value1", baseTime 
+ 100);
+        processWindowedKeyValueAndVerifyTimestamped("key2", "value2", baseTime 
+ 200);
+        processWindowedKeyValueAndVerifyTimestamped("key3", "value3", baseTime 
+ 300);
+
+        kafkaStreams.close();
+        kafkaStreams = null;
+
+        final StreamsBuilder newBuilder = new StreamsBuilder();
+        newBuilder.addStateStore(
+                Stores.timestampedWindowStoreWithHeadersBuilder(
+                    persistentStore
+                        ? 
Stores.persistentTimestampedWindowStoreWithHeaders(WINDOW_STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false)
+                        : Stores.inMemoryWindowStore(WINDOW_STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedWindowedWithHeadersProcessor::new, 
WINDOW_STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+        kafkaStreams.start();
+
+        verifyWindowValueWithEmptyHeaders("key1", "value1", baseTime + 100);
+        verifyWindowValueWithEmptyHeaders("key2", "value2", baseTime + 200);
+        verifyWindowValueWithEmptyHeaders("key3", "value3", baseTime + 300);
+
+        final Headers headers = new RecordHeaders();
+        headers.add("source", "migration-test".getBytes());
+        headers.add("version", "1.0".getBytes());
+
+        processWindowedKeyValueWithHeadersAndVerify("key3", "value3-updated", 
baseTime + 350, headers, headers);
+        processWindowedKeyValueWithHeadersAndVerify("key4", "value4", baseTime 
+ 400, headers, headers);
+
+        kafkaStreams.close();
+    }
+
+    @Test
+    public void 
shouldProxyTimestampedWindowStoreToTimestampedWindowStoreWithHeaders() throws 
Exception {
+        final StreamsBuilder oldBuilder = new StreamsBuilder();
+        oldBuilder.addStateStore(
+                Stores.timestampedWindowStoreBuilder(
+                    Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+        kafkaStreams.start();
+
+        final long baseTime = CLUSTER.time.milliseconds();
+        processWindowedKeyValueAndVerifyTimestamped("key1", "value1", baseTime 
+ 100);
+        processWindowedKeyValueAndVerifyTimestamped("key2", "value2", baseTime 
+ 200);
+        processWindowedKeyValueAndVerifyTimestamped("key3", "value3", baseTime 
+ 300);
+
+        kafkaStreams.close();
+        kafkaStreams = null;
+
+        // Restart with headers-aware builder but non-headers supplier 
(proxy/adapter mode)
+        final StreamsBuilder newBuilder = new StreamsBuilder();
+        newBuilder.addStateStore(
+                Stores.timestampedWindowStoreWithHeadersBuilder(
+                    Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME, 
Duration.ofMillis(RETENTION_MS), Duration.ofMillis(WINDOW_SIZE_MS), false),  // 
non-headers supplier!
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedWindowedWithHeadersProcessor::new, 
WINDOW_STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+        kafkaStreams.start();
+
+        verifyWindowValueWithEmptyHeaders("key1", "value1", baseTime + 100);
+        verifyWindowValueWithEmptyHeaders("key2", "value2", baseTime + 200);
+        verifyWindowValueWithEmptyHeaders("key3", "value3", baseTime + 300);
+
+        final RecordHeaders headers = new RecordHeaders();
+        headers.add("source", "proxy-test".getBytes());
+
+        // In proxy mode, headers are stripped when writing to non-headers 
store
+        // So we expect empty headers when reading back
+        final Headers expectedHeaders = new RecordHeaders();
+
+        processWindowedKeyValueWithHeadersAndVerify("key3", "value3-updated", 
baseTime + 350, headers, expectedHeaders);
+        processWindowedKeyValueWithHeadersAndVerify("key4", "value4", baseTime 
+ 400, headers, expectedHeaders);
+
+        kafkaStreams.close();
+    }
+
+    private void processWindowedKeyValueAndVerifyTimestamped(final String key,
+                                                             final String 
value,
+                                                             final long 
timestamp) throws Exception {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputStream,
+            singletonList(KeyValue.pair(key, value)),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class),
+            timestamp,
+            false);
+
+        TestUtils.waitForCondition(() -> {
+            try {
+                final ReadOnlyWindowStore<String, ValueAndTimestamp<String>> 
store =
+                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, QueryableStoreTypes.timestampedWindowStore());
+
+                if (store == null) {
+                    return false;
+                }
+
+                final long windowStart = timestamp - (timestamp % 
WINDOW_SIZE_MS);
+                final ValueAndTimestamp<String> result = store.fetch(key, 
windowStart);
+
+                return result != null
+                    && result.value().equals(value)
+                    && result.timestamp() == timestamp;
+            } catch (final Exception e) {
+                return false;
+            }
+        }, 60_000L, "Could not verify timestamped value in time.");
+    }
+
+    private void processWindowedKeyValueWithHeadersAndVerify(final String key,
+                                                              final String 
value,
+                                                              final long 
timestamp,
+                                                              final Headers 
headers,
+                                                              final Headers 
expectedHeaders) throws Exception {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputStream,
+            singletonList(KeyValue.pair(key, value)),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class),
+            headers,
+            timestamp,
+            false);
+
+        TestUtils.waitForCondition(() -> {
+            try {
+                final ReadOnlyWindowStore<String, 
ValueTimestampHeaders<String>> store =
+                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, QueryableStoreTypes.windowStore());
+
+                if (store == null) {
+                    return false;
+                }
+
+                final long windowStart = timestamp - (timestamp % 
WINDOW_SIZE_MS);
+
+                final List<KeyValue<Windowed<String>, 
ValueTimestampHeaders<String>>> results = new LinkedList<>();
+                try (final KeyValueIterator<Windowed<String>, 
ValueTimestampHeaders<String>> iterator = store.all()) {
+                    while (iterator.hasNext()) {
+                        final KeyValue<Windowed<String>, 
ValueTimestampHeaders<String>> kv = iterator.next();
+                        if (kv.key.key().equals(key) && 
kv.key.window().start() == windowStart) {
+                            results.add(kv);
+                        }
+                    }
+                }
+
+                if (results.isEmpty()) {
+                    return false;
+                }
+
+                final ValueTimestampHeaders<String> result = 
results.get(0).value;
+                return result != null
+                    && result.value().equals(value)
+                    && result.timestamp() == timestamp
+                    && result.headers().equals(expectedHeaders);
+            } catch (final Exception e) {
+                e.printStackTrace();
+                return false;
+            }
+        }, 60_000L, "Could not verify windowed value with headers in time.");
+    }
+
+    private void verifyWindowValueWithEmptyHeaders(final String key,
+                                                    final String value,
+                                                    final long timestamp) 
throws Exception {
+        TestUtils.waitForCondition(() -> {
+            try {
+                final ReadOnlyWindowStore<String, 
ValueTimestampHeaders<String>> store =
+                    IntegrationTestUtils.getStore(WINDOW_STORE_NAME, 
kafkaStreams, QueryableStoreTypes.windowStore());
+
+                if (store == null) {
+                    return false;
+                }
+
+                final long windowStart = timestamp - (timestamp % 
WINDOW_SIZE_MS);
+
+                final List<KeyValue<Windowed<String>, 
ValueTimestampHeaders<String>>> results = new LinkedList<>();
+                try (final KeyValueIterator<Windowed<String>, 
ValueTimestampHeaders<String>> iterator = store.all()) {
+                    while (iterator.hasNext()) {
+                        final KeyValue<Windowed<String>, 
ValueTimestampHeaders<String>> kv = iterator.next();
+                        if (kv.key.key().equals(key) && 
kv.key.window().start() == windowStart) {
+                            results.add(kv);
+                        }
+                    }
+                }
+
+                if (results.isEmpty()) {
+                    return false;
+                }
+
+                final ValueTimestampHeaders<String> result = 
results.get(0).value;
+                assertNotNull(result, "Result should not be null");
+                assertEquals(value, result.value(), "Value should match");
+                assertEquals(timestamp, result.timestamp(), "Timestamp should 
match");
+
+                // Verify headers exist but are empty (migrated from 
timestamped store without headers)
+                assertNotNull(result.headers(), "Headers should not be null 
for migrated data");
+                assertEquals(0, result.headers().toArray().length, "Headers 
should be empty for migrated data");
+
+                return true;
+            } catch (final Exception e) {
+                e.printStackTrace();
+                return false;
+            }
+        }, 60_000L, "Could not verify legacy value with empty headers in 
time.");
+    }
+
+    /**
+     * Processor for TimestampedWindowStore (without headers).
+     */
+    private static class TimestampedWindowedProcessor implements 
Processor<String, String, Void, Void> {
+        private TimestampedWindowStore<String, String> store;
+
+        @Override
+        public void init(final ProcessorContext<Void, Void> context) {
+            store = context.getStateStore(WINDOW_STORE_NAME);
+        }
+
+        @Override
+        public void process(final Record<String, String> record) {
+            final long windowStart = record.timestamp() - (record.timestamp() 
% WINDOW_SIZE_MS);
+            store.put(record.key(), ValueAndTimestamp.make(record.value(), 
record.timestamp()), windowStart);
+        }
+    }
+
+    /**
+     * Processor for TimestampedWindowStoreWithHeaders (with headers).
+     */
+    private static class TimestampedWindowedWithHeadersProcessor implements 
Processor<String, String, Void, Void> {
+        private TimestampedWindowStoreWithHeaders<String, String> store;
+
+        @Override
+        public void init(final ProcessorContext<Void, Void> context) {
+            store = context.getStateStore(WINDOW_STORE_NAME);
+        }
+
+        @Override
+        public void process(final Record<String, String> record) {
+            final long windowStart = record.timestamp() - (record.timestamp() 
% WINDOW_SIZE_MS);
+            store.put(record.key(),
+                ValueTimestampHeaders.make(record.value(), record.timestamp(), 
record.headers()),
+                windowStart);
+        }
+    }
+
+    @Test
+    public void 
shouldFailDowngradeFromTimestampedWindowStoreWithHeadersToTimestampedWindowStore()
 throws Exception {
+        final Properties props = props();
+        final long baseTime = setupAndPopulateWindowStoreWithHeaders(props, 
singletonList(KeyValue.pair("key1", 100L)));
+        kafkaStreams = null;
+
+        // Attempt to downgrade to non-headers window store
+        final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+        downgradedBuilder.addStateStore(
+                Stores.timestampedWindowStoreBuilder(
+                    Stores.persistentTimestampedWindowStore(WINDOW_STORE_NAME,
+                        Duration.ofMillis(RETENTION_MS),
+                        Duration.ofMillis(WINDOW_SIZE_MS),
+                        false),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedWindowedProcessor::new, WINDOW_STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+
+        try {
+            kafkaStreams.start();
+
+            final ReadOnlyWindowStore<String, ValueAndTimestamp<String>> store 
=
+                IntegrationTestUtils.getStore(WINDOW_STORE_NAME, kafkaStreams, 
QueryableStoreTypes.timestampedWindowStore());
+
+            final long windowStart = baseTime + 100 - ((baseTime + 100) % 
WINDOW_SIZE_MS);
+            final ValueAndTimestamp<String> result = store.fetch("key1", 
windowStart);
+
+            // If we can read the data correctly, the test should fail
+            if (result != null && result.value().equals("value1") && 
result.timestamp() == (baseTime + 100)) {
+                throw new AssertionError("Expected data corruption or read 
failure when downgrading without cleanup, " +
+                    "but data was read correctly (value=" + result.value() + 
", timestamp=" + result.timestamp() + "). " +
+                    "Downgrades without cleanup should not succeed.");
+            }
+            // Otherwise (null or corrupted data), the downgrade failed as 
expected

Review Comment:
   We should never reach this part right? We either crash with an exception, or 
we throw the assertion-error?
   
   So we should also fail the test here explicitly, to avoid what we pass 
incorrectly



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to