This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b603b5306e4 KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders 
(N/N) (#21529)
b603b5306e4 is described below

commit b603b5306e4d1cc837c709886a35f652a7d337b1
Author: Alieh Saeedi <[email protected]>
AuthorDate: Wed Feb 25 14:38:17 2026 +0100

    KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders (N/N) (#21529)
    
    This PR implements the upgrade integration tests for
    `TimestampedKeyValueStoreWithHeaders` introduced in kip-1271.
---
 .../HeadersStoreUpgradeIntegrationTest.java        | 352 +++++++++++++++++++++
 .../TimestampedKeyValueStoreWithHeadersTest.java   |   6 +-
 ...=> TimestampedStoreUpgradeIntegrationTest.java} |   2 +-
 .../processor/internals/StateManagerUtil.java      |  13 +-
 .../state/internals/CachingKeyValueStore.java      |   3 +
 .../streams/state/internals/RecordConverters.java  |  69 ++++
 ...TimestampedKeyValueStoreBuilderWithHeaders.java |   2 +-
 .../streams/state/internals/WrappedStateStore.java |  11 +
 .../state/internals/RecordConvertersTest.java      |  21 ++
 ...stampedKeyValueStoreBuilderWithHeadersTest.java |  33 --
 10 files changed, 471 insertions(+), 41 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
new file mode 100644
index 00000000000..9b6371f5a19
--- /dev/null
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
+
+@Tag("integration")
+public class HeadersStoreUpgradeIntegrationTest {
+    private static final String STORE_NAME = "store";
+    private String inputStream;
+
+    private KafkaStreams kafkaStreams;
+
+    public static final EmbeddedKafkaCluster CLUSTER = new 
EmbeddedKafkaCluster(1);
+
+    @BeforeAll
+    public static void startCluster() throws IOException {
+        CLUSTER.start();
+    }
+
+    @AfterAll
+    public static void closeCluster() {
+        CLUSTER.stop();
+    }
+
+    public String safeTestName;
+
+    @BeforeEach
+    public void createTopics(final TestInfo testInfo) throws Exception {
+        safeTestName = safeUniqueTestName(testInfo);
+        inputStream = "input-stream-" + safeTestName;
+        CLUSTER.createTopic(inputStream);
+    }
+
+    private Properties props() {
+        final Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 
1000L);
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
+        return streamsConfiguration;
+    }
+
+    @AfterEach
+    public void shutdown() {
+        if (kafkaStreams != null) {
+            kafkaStreams.close(Duration.ofSeconds(30L));
+            kafkaStreams.cleanUp();
+        }
+    }
+
+    @Test
+    public void 
shouldMigrateInMemoryTimestampedKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi()
 throws Exception {
+        
shouldMigrateTimestampedKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi(false);
+    }
+
+    @Test
+    public void 
shouldMigratePersistentTimestampedKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi()
 throws Exception {
+        
shouldMigrateTimestampedKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi(true);
+    }
+
+    private void 
shouldMigrateTimestampedKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi(final
 boolean persistentStore) throws Exception {
+        final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();
+
+        streamsBuilderForOldStore.addStateStore(
+            Stores.timestampedKeyValueStoreBuilder(
+                persistentStore ? 
Stores.persistentTimestampedKeyValueStore(STORE_NAME) : 
Stores.inMemoryKeyValueStore(STORE_NAME),
+                Serdes.String(),
+                Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedKeyValueProcessor::new, STORE_NAME);
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), 
props);
+        kafkaStreams.start();
+
+        processKeyValueAndVerifyTimestampedValue("key1", "value1", 11L);
+        processKeyValueAndVerifyTimestampedValue("key2", "value2", 22L);
+        processKeyValueAndVerifyTimestampedValue("key3", "value3", 33L);
+
+        kafkaStreams.close();
+        kafkaStreams = null;
+
+        final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
+
+        streamsBuilderForNewStore.addStateStore(
+            Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                persistentStore ? 
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME) : 
Stores.inMemoryKeyValueStore(STORE_NAME),
+                Serdes.String(),
+                Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(), 
props);
+        kafkaStreams.start();
+
+        // Verify legacy data can be read with empty headers
+        verifyLegacyValuesWithEmptyHeaders("key1", "value1", 11L);
+        verifyLegacyValuesWithEmptyHeaders("key2", "value2", 22L);
+        verifyLegacyValuesWithEmptyHeaders("key3", "value3", 33L);
+
+        // Process new records with headers
+        final Headers headers = new RecordHeaders();
+        headers.add("source", "test".getBytes());
+
+        processKeyValueWithTimestampAndHeadersAndVerify("key3", "value3", 
333L, headers, headers);
+        processKeyValueWithTimestampAndHeadersAndVerify("key4new", "value4", 
444L, headers, headers);
+
+        kafkaStreams.close();
+    }
+
+    @Test
+    public void 
shouldProxyTimestampedKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi()
 throws Exception {
+        final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();
+
+        streamsBuilderForOldStore.addStateStore(
+            Stores.timestampedKeyValueStoreBuilder(
+                Stores.persistentTimestampedKeyValueStore(STORE_NAME),
+                Serdes.String(),
+                Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedKeyValueProcessor::new, STORE_NAME);
+
+        final Properties props = props();
+        kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), 
props);
+        kafkaStreams.start();
+
+        processKeyValueAndVerifyTimestampedValue("key1", "value1", 11L);
+        processKeyValueAndVerifyTimestampedValue("key2", "value2", 22L);
+        processKeyValueAndVerifyTimestampedValue("key3", "value3", 33L);
+
+        kafkaStreams.close();
+        kafkaStreams = null;
+
+
+
+        final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
+
+        streamsBuilderForNewStore.addStateStore(
+            Stores.timestampedKeyValueStoreBuilderWithHeaders(
+                Stores.persistentTimestampedKeyValueStore(STORE_NAME),
+                Serdes.String(),
+                Serdes.String()))
+            .stream(inputStream, Consumed.with(Serdes.String(), 
Serdes.String()))
+            .process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
+
+        kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(), 
props);
+        kafkaStreams.start();
+
+        // Verify legacy data can be read with empty headers
+        verifyLegacyValuesWithEmptyHeaders("key1", "value1", 11L);
+        verifyLegacyValuesWithEmptyHeaders("key2", "value2", 22L);
+        verifyLegacyValuesWithEmptyHeaders("key3", "value3", 33L);
+
+        // Process new records with headers
+        final RecordHeaders headers = new RecordHeaders();
+        headers.add("source", "proxy-test".getBytes());
+        final Headers expectedHeaders = new RecordHeaders();
+
+        processKeyValueWithTimestampAndHeadersAndVerify("key3", "value3", 
333L, headers, expectedHeaders);
+        processKeyValueWithTimestampAndHeadersAndVerify("key4new", "value4", 
444L, headers, expectedHeaders);
+
+        kafkaStreams.close();
+    }
+
+    private <K, V> void processKeyValueAndVerifyTimestampedValue(final K key,
+                                                                 final V value,
+                                                                 final long 
timestamp)
+        throws Exception {
+
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputStream,
+            singletonList(KeyValue.pair(key, value)),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class),
+            timestamp,
+            false);
+
+        TestUtils.waitForCondition(
+            () -> {
+                try {
+                    final ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> store 
=
+                        IntegrationTestUtils.getStore(STORE_NAME, 
kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore());
+
+                    if (store == null) {
+                        return false;
+                    }
+
+                    final ValueAndTimestamp<V> result = store.get(key);
+                    return result != null && result.value().equals(value) && 
result.timestamp() == timestamp;
+                } catch (final Exception swallow) {
+                    swallow.printStackTrace();
+                    System.err.println(swallow.getMessage());
+                    return false;
+                }
+            },
+            60_000L,
+            "Could not get expected result in time.");
+    }
+
+    private <K, V> void processKeyValueWithTimestampAndHeadersAndVerify(final 
K key,
+                                                                        final 
V value,
+                                                                        final 
long timestamp,
+                                                                        final 
Headers headers,
+                                                                        final 
Headers expectedHeaders)
+        throws Exception {
+
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            inputStream,
+            singletonList(KeyValue.pair(key, value)),
+            TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+                StringSerializer.class,
+                StringSerializer.class),
+            headers,
+            timestamp,
+            false);
+
+        TestUtils.waitForCondition(
+            () -> {
+                try {
+                    final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> 
store = IntegrationTestUtils
+                        .getStore(STORE_NAME, kafkaStreams, 
QueryableStoreTypes.keyValueStore());
+
+                    if (store == null)
+                        return false;
+
+                    final ValueTimestampHeaders<V> result = store.get(key);
+                    return result != null
+                        && result.value().equals(value)
+                        && result.timestamp() == timestamp
+                        && result.headers().equals(expectedHeaders);
+                } catch (final Exception swallow) {
+                    swallow.printStackTrace();
+                    System.err.println(swallow.getMessage());
+                    return false;
+                }
+            },
+            60_000L,
+            "Could not get expected result in time.");
+    }
+
+    private <K, V> void verifyLegacyValuesWithEmptyHeaders(final K key,
+                                                           final V value,
+                                                           final long 
timestamp) throws Exception {
+        TestUtils.waitForCondition(
+            () -> {
+                try {
+                    final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>> 
store = IntegrationTestUtils
+                        .getStore(STORE_NAME, kafkaStreams, 
QueryableStoreTypes.keyValueStore());
+
+                    if (store == null)
+                        return false;
+
+                    final ValueTimestampHeaders<V> result = store.get(key);
+                    return result != null
+                        && result.value().equals(value)
+                        && result.timestamp() == timestamp
+                        && result.headers().toArray().length == 0;
+                } catch (final Exception swallow) {
+                    swallow.printStackTrace();
+                    System.err.println(swallow.getMessage());
+                    return false;
+                }
+            },
+            60_000L,
+            "Could not get expected result in time.");
+    }
+
+    private static class TimestampedKeyValueProcessor implements 
Processor<String, String, Void, Void> {
+        private TimestampedKeyValueStore<String, String> store;
+
+        @Override
+        public void init(final ProcessorContext<Void, Void> context) {
+            store = context.getStateStore(STORE_NAME);
+        }
+
+        @Override
+        public void process(final Record<String, String> record) {
+            store.put(record.key(), ValueAndTimestamp.make(record.value(), 
record.timestamp()));
+        }
+    }
+
+    private static class TimestampedKeyValueWithHeadersProcessor implements 
Processor<String, String, Void, Void> {
+        private TimestampedKeyValueStoreWithHeaders<String, String> store;
+
+        @Override
+        public void init(final ProcessorContext<Void, Void> context) {
+            store = context.getStateStore(STORE_NAME);
+        }
+
+        @Override
+        public void process(final Record<String, String> record) {
+            store.put(record.key(), ValueTimestampHeaders.make(record.value(), 
record.timestamp(), record.headers()));
+        }
+    }
+}
\ No newline at end of file
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java
index 767615bd105..b7cbdb2d82d 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java
@@ -428,9 +428,9 @@ public class TimestampedKeyValueStoreWithHeadersTest {
     @SuppressWarnings("varargs")
     @SafeVarargs
     private final int produceDataToTopicWithHeaders(final String topic,
-                                                     final long timestamp,
-                                                     final Headers headers,
-                                                     final KeyValue<Integer, 
String>... keyValues) {
+                                                    final long timestamp,
+                                                    final Headers headers,
+                                                    final KeyValue<Integer, 
String>... keyValues) {
         IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
             topic,
             Arrays.asList(keyValues),
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedStoreUpgradeIntegrationTest.java
similarity index 99%
rename from 
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
rename to 
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedStoreUpgradeIntegrationTest.java
index cb248138d0e..dbeaa628d4c 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedStoreUpgradeIntegrationTest.java
@@ -62,7 +62,7 @@ import static java.util.Collections.singletonList;
 import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
 
 @Tag("integration")
-public class StoreUpgradeIntegrationTest {
+public class TimestampedStoreUpgradeIntegrationTest {
     private static final String STORE_NAME = "store";
     private String inputStream;
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
index 6990bd29029..6706ed89543 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
@@ -35,7 +35,9 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static 
org.apache.kafka.streams.state.internals.RecordConverters.identity;
+import static 
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToHeadersValue;
 import static 
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToTimestampedValue;
+import static 
org.apache.kafka.streams.state.internals.WrappedStateStore.isHeadersAware;
 import static 
org.apache.kafka.streams.state.internals.WrappedStateStore.isTimestamped;
 import static 
org.apache.kafka.streams.state.internals.WrappedStateStore.isVersioned;
 
@@ -50,9 +52,14 @@ final class StateManagerUtil {
     private StateManagerUtil() {}
 
     static RecordConverter converterForStore(final StateStore store) {
-        // should not prepend timestamp when restoring records for versioned 
store, as
-        // timestamp is used separately during put() process for restore of 
versioned stores
-        return (isTimestamped(store) && !isVersioned(store)) ? 
rawValueToTimestampedValue() : identity();
+        if (isHeadersAware(store)) {
+            return rawValueToHeadersValue();
+        } else if (isTimestamped(store) && !isVersioned(store)) {
+            // should not prepend timestamp when restoring records for 
versioned store, as
+            // timestamp is used separately during put() process for restore 
of versioned stores
+            return rawValueToTimestampedValue();
+        }
+        return identity();
     }
 
     static boolean checkpointNeeded(final boolean enforceCheckpoint,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index e8588d9146a..0e31ef87fa7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -177,6 +177,9 @@ public class CachingKeyValueStore
                                            final Position mergedPosition,
                                            final PositionBound positionBound,
                                            final QueryConfig config) {
+        if (cacheType == CacheType.TIMESTAMPED_KEY_VALUE_STORE_WITH_HEADERS) {
+            throw new UnsupportedOperationException("Queries (IQv2) are not 
supported for timestamped key-value stores with headers yet.");
+        }
         QueryResult<R> result = null;
         final KeyQuery<Bytes, byte[]> keyQuery = (KeyQuery<Bytes, byte[]>) 
query;
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
index ad3c91e8073..8eb7be4f5de 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
@@ -17,7 +17,14 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.utils.ByteUtils;
 
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 
 public final class RecordConverters {
@@ -46,6 +53,35 @@ public final class RecordConverters {
         );
     };
 
+    private static final RecordConverter RAW_TO_WITH_HEADERS_INSTANCE = record 
-> {
+        final byte[] rawValue = record.value();
+
+        // Format: [headersSize(varint)][headersBytes][timestamp(8)][value]
+        final byte[] recordValue = reconstructFromRaw(
+            rawValue,
+            record.timestamp(),
+            record.headers()
+        );
+
+        return new ConsumerRecord<>(
+            record.topic(),
+            record.partition(),
+            record.offset(),
+            record.timestamp(),
+            record.timestampType(),
+            record.serializedKeySize(),
+            record.serializedValueSize(),
+            record.key(),
+            recordValue,
+            record.headers(),
+            record.leaderEpoch()
+        );
+    };
+
+    public static RecordConverter rawValueToHeadersValue() {
+        return RAW_TO_WITH_HEADERS_INSTANCE;
+    }
+
     // privatize the constructor so the class cannot be instantiated (only 
used for its static members)
     private RecordConverters() {}
 
@@ -56,4 +92,37 @@ public final class RecordConverters {
     public static RecordConverter identity() {
         return IDENTITY_INSTANCE;
     }
+
+    /**
+     * Reconstructs the ValueTimestampHeaders format from raw value bytes, 
timestamp, and headers.
+     * Used during state restoration from changelog topics.
+     *
+     * @param rawValue the raw value bytes
+     * @param timestamp the timestamp
+     * @param headers the headers
+     * @return the serialized ValueTimestampHeaders format
+     */
+    static byte[] reconstructFromRaw(final byte[] rawValue, final long 
timestamp, final Headers headers) {
+        if (rawValue == null) {
+            return null;
+        }
+        final byte[] rawTimestamp;
+        try (LongSerializer timestampSerializer = new LongSerializer()) {
+            rawTimestamp = timestampSerializer.serialize("", timestamp);
+        }
+        final byte[] rawHeaders = HeadersSerializer.serialize(headers);
+
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             final DataOutputStream out = new DataOutputStream(baos)) {
+
+            ByteUtils.writeVarint(rawHeaders.length, out);
+            out.write(rawHeaders);
+            out.write(rawTimestamp);
+            out.write(rawValue);
+
+            return baos.toByteArray();
+        } catch (final IOException e) {
+            throw new SerializationException("Failed to reconstruct 
ValueTimestampHeaders", e);
+        }
+    }
 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
index 0c09eb2027d..be9ecb38bc9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
@@ -179,7 +179,7 @@ public class TimestampedKeyValueStoreBuilderWithHeaders<K, 
V>
 
         @Override
         public Position getPosition() {
-            throw new UnsupportedOperationException("Position is not supported 
by timestamped key-value stores with headers yet.");
+            return wrapped().getPosition();
         }
 
         @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
index 0f0f7ee62ee..d1ce967bcc8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.Query;
 import org.apache.kafka.streams.query.QueryConfig;
 import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.HeadersBytesStore;
 import org.apache.kafka.streams.state.TimestampedBytesStore;
 import org.apache.kafka.streams.state.VersionedBytesStore;
 
@@ -55,6 +56,16 @@ public abstract class WrappedStateStore<S extends 
StateStore, K, V> implements S
         }
     }
 
+    public static boolean isHeadersAware(final StateStore stateStore) {
+        if (stateStore instanceof HeadersBytesStore) {
+            return true;
+        } else if (stateStore instanceof WrappedStateStore) {
+            return isHeadersAware(((WrappedStateStore<?, ?, ?>) 
stateStore).wrapped());
+        } else {
+            return false;
+        }
+    }
+
     private final S wrapped;
 
     public WrappedStateStore(final S wrapped) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java
index 2f222ed6a8f..93d08c00293 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.state.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.record.TimestampType;
 
@@ -25,6 +26,7 @@ import org.junit.jupiter.api.Test;
 import java.nio.ByteBuffer;
 import java.util.Optional;
 
+import static 
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToHeadersValue;
 import static 
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToTimestampedValue;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -32,11 +34,14 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 public class RecordConvertersTest {
 
     private final RecordConverter timestampedValueConverter = 
rawValueToTimestampedValue();
+    private final RecordConverter headersValueConverter = 
rawValueToHeadersValue();
+
 
     @Test
     public void shouldPreserveNullValueOnConversion() {
         final ConsumerRecord<byte[], byte[]> nullValueRecord = new 
ConsumerRecord<>("", 0, 0L, new byte[0], null);
         assertNull(timestampedValueConverter.convert(nullValueRecord).value());
+        assertNull(headersValueConverter.convert(nullValueRecord).value());
     }
 
     @Test
@@ -50,4 +55,20 @@ public class RecordConvertersTest {
         final byte[] actualValue = 
timestampedValueConverter.convert(inputRecord).value();
         assertArrayEquals(expectedValue, actualValue);
     }
+
+    @Test
+    public void 
shouldAddTimestampAndHeadersToValueOnConversionWhenValueIsNotNull() {
+        final long timestamp = 10L;
+        final byte[] value = new byte[1];
+        final Headers headers = new RecordHeaders().add("header-key", 
"header-value".getBytes());
+        final ConsumerRecord<byte[], byte[]> inputRecord = new 
ConsumerRecord<>(
+            "topic", 1, 0, timestamp, TimestampType.CREATE_TIME, 0, 0, new 
byte[0], value,
+            headers, Optional.empty());
+        // Expected format: 
[headersSize(varint)][headersBytes][timestamp(8)][value]
+        final byte[] expectedValue =
+            {50, 2, 20, 104, 101, 97, 100, 101, 114, 45, 107, 101, 121, 24, 
104, 101, 97, 100, 101,
+                114, 45, 118, 97, 108, 117, 101, 0, 0, 0, 0, 0, 0, 0, 10, 0};
+        final byte[] actualValue = 
headersValueConverter.convert(inputRecord).value();
+        assertArrayEquals(expectedValue, actualValue);
+    }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
index 3929a11667a..a49e85bbb1a 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
@@ -23,9 +23,7 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.query.KeyQuery;
 import org.apache.kafka.streams.query.PositionBound;
 import org.apache.kafka.streams.query.QueryConfig;
-import org.apache.kafka.streams.state.HeadersBytesStore;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
 import org.apache.kafka.streams.state.ValueTimestampHeaders;
 
@@ -336,37 +334,6 @@ public class 
TimestampedKeyValueStoreBuilderWithHeadersTest {
         assertTrue(exception.getMessage().contains("Position is not supported 
by timestamped key-value stores with headers yet."));
     }
 
-    @Test
-    public void shouldThrowOnGetPositionForInMemoryStoreMarker() {
-        when(supplier.name()).thenReturn("test-store");
-        when(supplier.metricsScope()).thenReturn("metricScope");
-        when(supplier.get()).thenReturn(new 
InMemoryKeyValueStore("test-store"));
-
-        builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
-                supplier,
-                Serdes.String(),
-                Serdes.String(),
-                new MockTime()
-        );
-
-        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
builder
-                .withLoggingDisabled()
-                .withCachingDisabled()
-                .build();
-
-        // Unwrap to get directly to the 
InMemoryTimestampedKeyValueStoreWithHeadersMarker
-        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
-        assertInstanceOf(KeyValueStore.class, wrapped);
-        assertInstanceOf(HeadersBytesStore.class, wrapped);
-
-        final UnsupportedOperationException exception = assertThrows(
-                UnsupportedOperationException.class,
-            wrapped::getPosition
-        );
-
-        assertTrue(exception.getMessage().contains("Position is not supported 
by timestamped key-value stores with headers yet."));
-    }
-
     @Test
     public void shouldThrowOnGetPositionForHeadersStoreAdapter() {
         when(supplier.name()).thenReturn("test-store");

Reply via email to