This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ab5a07abadf KAFKA-20218: Add downgrade tests in 
TimestampedKeyValueStoreWithHeadersTest (#21667)
ab5a07abadf is described below

commit ab5a07abadf0ee29faece540862792a6368ff13d
Author: Alieh Saeedi <[email protected]>
AuthorDate: Mon Mar 9 22:48:56 2026 +0100

    KAFKA-20218: Add downgrade tests in TimestampedKeyValueStoreWithHeadersTest 
(#21667)
    
    Adding two integration tests to verify whether downgrading from header
    key value store to timestamped key value store is possible. Directly
    downgrading is not supported; the only option is to delete the store
    entirely and restore it from the changelog.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../HeadersStoreUpgradeIntegrationTest.java        | 126 +++++++++++++++++++++
 1 file changed, 126 insertions(+)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
index 2976f0713fe..a9a3c9a9e32 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
@@ -269,6 +269,34 @@ public class HeadersStoreUpgradeIntegrationTest {
             "Could not get expected result in time.");
     }
 
+    private <K, V> void verifyLegacyTimestampedValue(final K key,
+                                                     final V value,
+                                                     final long timestamp)
+        throws Exception {
+
+        TestUtils.waitForCondition(
+            () -> {
+                try {
+                    final ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> store 
=
+                        IntegrationTestUtils.getStore(STORE_NAME, 
kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore());
+
+                    if (store == null) {
+                        return false;
+                    }
+
+                    final ValueAndTimestamp<V> result = store.get(key);
+                    return result != null && result.value().equals(value) && 
result.timestamp() == timestamp;
+                } catch (final Exception swallow) {
+                    swallow.printStackTrace();
+                    System.err.println(swallow.getMessage());
+                    return false;
+                }
+            },
+            60_000L,
+            "Could not get expected result in time.");
+    }
+
+
     private <K, V> void processKeyValueWithTimestampAndHeadersAndVerify(final 
K key,
                                                                         final 
V value,
                                                                         final 
long timestamp,
@@ -653,6 +681,82 @@ public class HeadersStoreUpgradeIntegrationTest {
         }
     }
 
+    @Test
+    public void 
shouldFailDowngradeFromTimestampedKeyValueStoreWithHeadersToTimestampedKeyValueStore()
 throws Exception {
+        final Properties props = props();
+        setupAndPopulateKeyValueStoreWithHeaders(props);
+        kafkaStreams = null;
+
+        // Attempt to downgrade to non-headers key-value store
+        final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+        downgradedBuilder.addStateStore(
+                Stores.timestampedKeyValueStoreBuilder(
+                    Stores.persistentTimestampedKeyValueStore(STORE_NAME),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedKeyValueProcessor::new, STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+
+        boolean exceptionThrown = false;
+        try {
+            kafkaStreams.start();
+        } catch (final Exception e) {
+            Throwable cause = e;
+            while (cause != null) {
+                if (cause instanceof ProcessorStateException &&
+                    cause.getMessage() != null &&
+                    cause.getMessage().contains("headers-aware") &&
+                    cause.getMessage().contains("Downgrade")) {
+                    exceptionThrown = true;
+                    break;
+                }
+                cause = cause.getCause();
+            }
+
+            if (!exceptionThrown) {
+                throw new AssertionError("Expected ProcessorStateException 
about downgrade not being supported, but got: " + e.getMessage(), e);
+            }
+        } finally {
+            kafkaStreams.close(Duration.ofSeconds(30L));
+        }
+
+        if (!exceptionThrown) {
+            throw new AssertionError("Expected ProcessorStateException to be 
thrown when attempting to downgrade from headers-aware to non-headers key-value 
store");
+        }
+    }
+
+    @Test
+    public void 
shouldSuccessfullyDowngradeFromTimestampedKeyValueStoreWithHeadersToTimestampedKeyValueStoreAfterCleanup()
 throws Exception {
+        final Properties props = props();
+        setupAndPopulateKeyValueStoreWithHeaders(props);
+
+        kafkaStreams.cleanUp(); // Delete local state
+        kafkaStreams = null;
+
+        final StreamsBuilder downgradedBuilder = new StreamsBuilder();
+        downgradedBuilder.addStateStore(
+                Stores.timestampedKeyValueStoreBuilder(
+                    Stores.persistentTimestampedKeyValueStore(STORE_NAME),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedKeyValueProcessor::new, STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(downgradedBuilder.build(), props);
+        kafkaStreams.start();
+
+        // verify legacy key, values
+        verifyLegacyTimestampedValue("key1", "value1", 11L);
+        verifyLegacyTimestampedValue("key2", "value2", 22L);
+
+        processKeyValueAndVerifyTimestampedValue("key3", "value3", 333L);
+        processKeyValueAndVerifyTimestampedValue("key4", "value4", 444L);
+
+        kafkaStreams.close();
+    }
+
     @Test
     public void 
shouldFailDowngradeFromTimestampedWindowStoreWithHeadersToTimestampedWindowStore()
 throws Exception {
         final Properties props = props();
@@ -809,6 +913,28 @@ public class HeadersStoreUpgradeIntegrationTest {
         return CLUSTER.time.milliseconds();
     }
 
+    private void setupAndPopulateKeyValueStoreWithHeaders(final Properties 
props) throws Exception {
+        final StreamsBuilder headersBuilder = new StreamsBuilder();
+        headersBuilder.addStateStore(
+                Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                    
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME),
+                    Serdes.String(),
+                    Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(headersBuilder.build(), props);
+        kafkaStreams.start();
+
+        final Headers headers = new RecordHeaders();
+        headers.add("source", "test".getBytes());
+
+        processKeyValueWithTimestampAndHeadersAndVerify("key1", "value1", 11L, 
headers, headers);
+        processKeyValueWithTimestampAndHeadersAndVerify("key2", "value2", 22L, 
headers, headers);
+
+        kafkaStreams.close();
+    }
+
     private void produceRecordWithHeaders(final String key, final String 
value, final long timestamp) throws Exception {
         final Headers headers = new RecordHeaders();
         headers.add("source", "test".getBytes());

Reply via email to