bbejeck commented on code in PR #21748:
URL: https://github.com/apache/kafka/pull/21748#discussion_r2933056234


##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java:
##########
@@ -1599,4 +1264,440 @@ private void 
setupAndPopulateKeyValueStoreWithHeaders(final Properties props) th
 
         kafkaStreams.close();
     }
+
+    // ==================== Session Store Tests ====================
+
+    @Test
+    public void 
shouldMigratePersistentSessionStoreToSessionStoreWithHeadersUsingPapi() throws 
Exception {
+        shouldMigrateSessionStoreToSessionStoreWithHeaders(true);
+    }
+
+    @Test
+    public void 
shouldMigrateInMemorySessionStoreToSessionStoreWithHeadersUsingPapi() throws 
Exception {
+        shouldMigrateSessionStoreToSessionStoreWithHeaders(false);
+    }
+
+    private void shouldMigrateSessionStoreToSessionStoreWithHeaders(final 
boolean isPersistent) throws Exception {
+        // Phase 1: Run with plain SessionStore
+        final StreamsBuilder oldBuilder = new StreamsBuilder();
+        oldBuilder.addStateStore(
+                Stores.sessionStoreBuilder(
+                    isPersistent ? 
Stores.persistentSessionStore(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)) :
+                        Stores.inMemorySessionStore(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(SessionProcessor::new, SESSION_STORE_NAME);
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
+
+        final long baseTime = CLUSTER.time.milliseconds();
+        processSessionKeyValueAndVerify("key1", "value1", baseTime + 100);
+        processSessionKeyValueAndVerify("key2", "value2", baseTime + 200);
+        processSessionKeyValueAndVerify("key3", "value3", baseTime + 300);
+
+        kafkaStreams.close(Duration.ofSeconds(5L));
+        kafkaStreams = null;
+
+        // Phase 2: Restart with SessionStoreWithHeaders (headers-aware 
supplier)
+        final StreamsBuilder newBuilder = new StreamsBuilder();
+        final AtomicReference<SessionWithHeadersProcessor> processorRef = new 
AtomicReference<>();
+        newBuilder.addStateStore(
+                Stores.sessionStoreBuilderWithHeaders(
+                    isPersistent ? 
Stores.persistentSessionStoreWithHeaders(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)) :
+                        Stores.inMemorySessionStore(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(() -> {
+                final SessionWithHeadersProcessor sessionStore = new 
SessionWithHeadersProcessor();
+                processorRef.set(sessionStore);
+                return sessionStore;
+            }, SESSION_STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(newBuilder.build(), props);
+        IntegrationTestUtils.startApplicationAndWaitUntilRunning(kafkaStreams);
+
+        // Verify legacy data can be read with empty headers
+        verifySessionValueWithEmptyHeaders("key1", "value1", baseTime + 100, 
processorRef);
+        verifySessionValueWithEmptyHeaders("key2", "value2", baseTime + 200, 
processorRef);
+        verifySessionValueWithEmptyHeaders("key3", "value3", baseTime + 300, 
processorRef);
+
+        // Process new records with headers
+        final Headers headers = new RecordHeaders();
+        headers.add("source", "migration-test".getBytes());
+
+        processSessionKeyValueWithHeadersAndVerify("key4", "value4", baseTime 
+ 400, headers, headers, processorRef);
+        processSessionKeyValueWithHeadersAndVerify("key5", "value5", baseTime 
+ 500, headers, headers, processorRef);
+
+        kafkaStreams.close();
+    }
+
+    @Test
+    public void shouldProxySessionStoreToSessionStoreWithHeaders() throws 
Exception {
+        // Phase 1: Run with plain SessionStore
+        final StreamsBuilder oldBuilder = new StreamsBuilder();
+        oldBuilder.addStateStore(
+                Stores.sessionStoreBuilder(
+                    Stores.persistentSessionStore(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(SessionProcessor::new, SESSION_STORE_NAME);
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(oldBuilder.build(), props);
+        kafkaStreams.start();
+
+        final long baseTime = CLUSTER.time.milliseconds();
+        processSessionKeyValueAndVerify("key1", "value1", baseTime + 100);
+        processSessionKeyValueAndVerify("key2", "value2", baseTime + 200);
+        processSessionKeyValueAndVerify("key3", "value3", baseTime + 300);
+
+        kafkaStreams.close();
+        kafkaStreams = null;
+
+        // Phase 2: Restart with headers-aware builder but non-headers 
supplier (proxy/adapter mode)
+        final StreamsBuilder newBuilder = new StreamsBuilder();
+        final AtomicReference<SessionWithHeadersProcessor> processorRef = new 
AtomicReference<>();
+        newBuilder.addStateStore(
+                Stores.sessionStoreBuilderWithHeaders(
+                    Stores.persistentSessionStore(SESSION_STORE_NAME, 
Duration.ofMillis(RETENTION_MS)),  // non-headers supplier!
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(() -> {

Review Comment:
    `AtomicReference `is used to capture the processor instance created by the 
Kafka Streams runtime thread so the test thread can access the store directly. 
This is needed because IQ goes through `ReadOnlySessionStoreFacade` which strips
      `AggregationWithHeaders`, preventing header verification.      



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to