bbejeck commented on code in PR #21748:
URL: https://github.com/apache/kafka/pull/21748#discussion_r2933049388


##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java:
##########
@@ -1599,4 +1264,440 @@ private void 
setupAndPopulateKeyValueStoreWithHeaders(final Properties props) th
 
         kafkaStreams.close();
     }
+
+    // ==================== Session Store Tests ====================
+
+    @Test
+    public void 
shouldMigratePersistentSessionStoreToSessionStoreWithHeadersUsingPapi() throws 
Exception {
+        shouldMigrateSessionStoreToSessionStoreWithHeaders(true);
+    }
+
+    @Test
+    public void 
shouldMigrateInMemorySessionStoreToSessionStoreWithHeadersUsingPapi() throws 
Exception {
+        shouldMigrateSessionStoreToSessionStoreWithHeaders(false);
+    }
+
+    private void shouldMigrateSessionStoreToSessionStoreWithHeaders(final 
boolean isPersistent) throws Exception {
+        // Phase 1: Run with plain SessionStore
+        final StreamsBuilder oldBuilder = new StreamsBuilder();
+        oldBuilder.addStateStore(
+                Stores.sessionStoreBuilder(
+                    isPersistent ? 
Stores.persistentSessionStore(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)) :
+                        Stores.inMemorySessionStore(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(SessionProcessor::new, SESSION_STORE_NAME);
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
+
+        final long baseTime = CLUSTER.time.milliseconds();
+        processSessionKeyValueAndVerify("key1", "value1", baseTime + 100);
+        processSessionKeyValueAndVerify("key2", "value2", baseTime + 200);
+        processSessionKeyValueAndVerify("key3", "value3", baseTime + 300);
+
+        kafkaStreams.close(Duration.ofSeconds(5L));
+        kafkaStreams = null;
+
+        // Phase 2: Restart with SessionStoreWithHeaders (headers-aware 
supplier)
+        final StreamsBuilder newBuilder = new StreamsBuilder();
+        final AtomicReference<SessionWithHeadersProcessor> processorRef = new 
AtomicReference<>();
+        newBuilder.addStateStore(
+                Stores.sessionStoreBuilderWithHeaders(
+                    isPersistent ? 
Stores.persistentSessionStoreWithHeaders(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)) :
+                        Stores.inMemorySessionStore(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(() -> {
+                final SessionWithHeadersProcessor sessionStore = new 
SessionWithHeadersProcessor();
+                processorRef.set(sessionStore);
+                return sessionStore;
+            }, SESSION_STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
+
+        // Verify legacy data can be read with empty headers
+        verifySessionValueWithEmptyHeaders("key1", "value1", baseTime + 100, 
processorRef);
+        verifySessionValueWithEmptyHeaders("key2", "value2", baseTime + 200, 
processorRef);
+        verifySessionValueWithEmptyHeaders("key3", "value3", baseTime + 300, 
processorRef);
+
+        // Process new records with headers
+        final Headers headers = new RecordHeaders();
+        headers.add("source", "migration-test".getBytes());
+
+        processSessionKeyValueWithHeadersAndVerify("key4", "value4", baseTime 
+ 400, headers, headers, processorRef);
+        processSessionKeyValueWithHeadersAndVerify("key5", "value5", baseTime 
+ 500, headers, headers, processorRef);
+
+        kafkaStreams.close();
+    }
+
+    @Test
+    public void shouldProxySessionStoreToSessionStoreWithHeaders() throws 
Exception {
+        // Phase 1: Run with plain SessionStore
+        final StreamsBuilder oldBuilder = new StreamsBuilder();
+        oldBuilder.addStateStore(
+                Stores.sessionStoreBuilder(
+                    Stores.persistentSessionStore(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(SessionProcessor::new, SESSION_STORE_NAME);
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+        kafkaStreams.start();
+
+        final long baseTime = CLUSTER.time.milliseconds();
+        processSessionKeyValueAndVerify("key1", "value1", baseTime + 100);
+        processSessionKeyValueAndVerify("key2", "value2", baseTime + 200);
+        processSessionKeyValueAndVerify("key3", "value3", baseTime + 300);
+
+        kafkaStreams.close();
+        kafkaStreams = null;
+
+        // Phase 2: Restart with headers-aware builder but non-headers 
supplier (proxy/adapter mode)
+        final StreamsBuilder newBuilder = new StreamsBuilder();
+        final AtomicReference<SessionWithHeadersProcessor> processorRef = new 
AtomicReference<>();
+        newBuilder.addStateStore(
+                Stores.sessionStoreBuilderWithHeaders(
+                    Stores.persistentSessionStore(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),  // non-headers supplier!
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(() -> {
+                final SessionWithHeadersProcessor p = new 
SessionWithHeadersProcessor();
+                processorRef.set(p);
+                return p;
+            }, SESSION_STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
+
+        // Verify legacy data can be read with empty headers
+        verifySessionValueWithEmptyHeaders("key1", "value1", baseTime + 100, 
processorRef);
+        verifySessionValueWithEmptyHeaders("key2", "value2", baseTime + 200, 
processorRef);
+        verifySessionValueWithEmptyHeaders("key3", "value3", baseTime + 300, 
processorRef);
+
+        // In proxy mode, headers are stripped when writing to non-headers 
store
+        // So we expect empty headers when reading back
+        final RecordHeaders headers = new RecordHeaders();
+        headers.add("source", "proxy-test".getBytes());
+        final Headers expectedHeaders = new RecordHeaders();
+
+        processSessionKeyValueWithHeadersAndVerify("key4", "value4", baseTime 
+ 400, headers, expectedHeaders, processorRef);
+        processSessionKeyValueWithHeadersAndVerify("key5", "value5", baseTime 
+ 500, headers, expectedHeaders, processorRef);
+
+        kafkaStreams.close();
+    }
+
+    @Test
+    public void shouldFailDowngradeFromSessionStoreWithHeadersToSessionStore() 
throws Exception {
+        final Properties props = props();
+        setupAndPopulateSessionStoreWithHeaders(props);
+        kafkaStreams = null;
+
+        // Attempt to downgrade to plain session store
+        final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+        downgradedBuilder.addStateStore(
+                Stores.sessionStoreBuilder(
+                    Stores.persistentSessionStore(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(SessionProcessor::new, SESSION_STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+
+        boolean exceptionThrown = false;
+        try {
+            kafkaStreams.start();
+        } catch (final Exception e) {
+            Throwable cause = e;
+            while (cause != null) {
+                if (cause instanceof ProcessorStateException &&
+                    cause.getMessage() != null &&
+                    cause.getMessage().contains("incompatible settings")) {
+                    exceptionThrown = true;
+                    break;
+                }
+                cause = cause.getCause();
+            }
+
+            if (!exceptionThrown) {
+                throw new AssertionError("Expected ProcessorStateException 
about incompatible settings, but got: " + e.getMessage(), e);
+            }
+        } finally {
+            kafkaStreams.close(Duration.ofSeconds(30L));
+        }
+
+        if (!exceptionThrown) {
+            throw new AssertionError("Expected ProcessorStateException to be 
thrown when attempting to downgrade from headers-aware to plain session store");
+        }
+    }
+
+    @Test
+    public void 
shouldSuccessfullyDowngradeFromSessionStoreWithHeadersToSessionStoreAfterCleanup()
 throws Exception {
+        final Properties props = props();
+        setupAndPopulateSessionStoreWithHeaders(props);
+
+        kafkaStreams.cleanUp(); // Delete local state
+        kafkaStreams = null;
+
+        final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+        downgradedBuilder.addStateStore(
+                Stores.sessionStoreBuilder(
+                    Stores.persistentSessionStore(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(SessionProcessor::new, SESSION_STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+        kafkaStreams.start();
+
+        final long newTime = CLUSTER.time.milliseconds();
+        processSessionKeyValueAndVerify("key3", "value3", newTime + 300);
+        processSessionKeyValueAndVerify("key4", "value4", newTime + 400);
+
+        kafkaStreams.close();
+    }
+
+    // ==================== Session Store Helper Methods ====================
+
+    private void processSessionKeyValueAndVerify(final String key,
+                                                  final String value,
+                                                  final long timestamp) throws 
Exception {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputStream,
+            singletonList(KeyValue.pair(key, value)),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class),
+            timestamp,
+            false);
+
+        TestUtils.waitForCondition(() -> {
+            try {
+                final ReadOnlySessionStore<String, String> store =
+                    IntegrationTestUtils.getStore(SESSION_STORE_NAME, 
kafkaStreams, QueryableStoreTypes.sessionStore());
+
+                if (store == null) {
+                    return false;
+                }
+
+                try (final KeyValueIterator<Windowed<String>, String> iterator 
= store.fetch(key)) {
+                    while (iterator.hasNext()) {
+                        final KeyValue<Windowed<String>, String> kv = 
iterator.next();
+                        if (kv.key.key().equals(key) && 
kv.value.equals(value)) {
+                            return true;
+                        }
+                    }
+                }
+                return false;
+            } catch (final Exception e) {
+                return false;
+            }
+        }, 60_000L, "Could not verify session value in time.");
+    }
+
+    private void verifySessionValueWithEmptyHeaders(final String key,
+                                                    final String value,
+                                                    final long timestamp,
+                                                    final 
AtomicReference<SessionWithHeadersProcessor> processorRef) throws Exception {
+        TestUtils.waitForCondition(() -> {
+            try {
+                if (processorRef.get() == null) {
+                    return false;
+                }
+                final ReadOnlySessionStore<String, 
AggregationWithHeaders<String>> store = processorRef.get().store();

Review Comment:
   The session store integration tests access state stores directly from within 
the processor rather than using `IntegrationTestUtils.getStore() / IQ`. This is 
necessary because IQ for `SessionStoreWithHeaders` goes through
     `ReadOnlySessionStoreFacade → SessionStoreIteratorFacade,` which strips 
the `AggregationWithHeaders` wrapper and returns only the raw aggregation 
value. Since these tests need to verify that headers are correctly preserved 
(or empty
     after migration), direct store access is required to inspect the full 
`AggregationWithHeaders` object.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to