mjsax commented on code in PR #21386:
URL: https://github.com/apache/kafka/pull/21386#discussion_r2761885808


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+
+/**
+ * Serializer for Kafka Headers.
+ *
+ * Serialization format (per KIP-1271):
+ * [NumHeaders(varint)][Header1][Header2]...
+ *
+ * Each header:
+ * [KeyLength(varint)][KeyBytes(UTF-8)][ValueLength(varint)][ValueBytes]
+ *
+ * Note: ValueLength is -1 for null values (encoded as varint).
+ * All integers are encoded as varints (signed varint encoding).
+ *
+ * This serializer produces the headersBytes portion. The headersSize prefix
+ * is added by the outer serializer (e.g., ValueTimestampHeadersSerializer).
+ *
+ * This is used by KIP-1271 to serialize headers for storage in state stores.
+ */
+public class HeadersSerializer {
+
+    /**
+     * Serializes headers into a byte array using varint encoding per KIP-1271.
+     * <p>
+     * The output format is [count][header1][header2]... without a size prefix.
+     * The size prefix is added by the outer serializer that uses this.
+     *
+     * @param headers the headers to serialize (can be null)
+     * @return the serialized byte array
+     */
+    public byte[] serialize(final Headers headers) {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             DataOutputStream out = new DataOutputStream(baos)) {
+
+            if (headers == null) {
+                // Empty headers: just [NumHeaders(varint) = 0]
+                ByteUtils.writeVarint(0, out);
+                return baos.toByteArray();
+            }
+
+            // Count headers
+            int headerCount = 0;
+            final Iterator<Header> iterator = headers.iterator();
+            while (iterator.hasNext()) {

Review Comment:
   Seems we are iteration over the header twice, here and below. Can we avoid 
this?
   
   Would using `headers.toArray()` help -- we could access `size` directly for 
this case.



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class HeadersSerializerTest {
+
+    private final HeadersSerializer serializer = new HeadersSerializer();
+
+    @Test
+    public void shouldSerializeNullHeaders() {
+        final byte[] serialized = serializer.serialize(null);
+
+        assertNotNull(serialized);
+        assertTrue(serialized.length > 0);
+        assertEquals(1, serialized.length, "Empty header should have 1 byte to 
indicate headers count is 0");

Review Comment:
   Should we also verify that the byte is "0" ?



##########
streams/src/main/java/org/apache/kafka/streams/state/ValueTimestampHeaders.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * Combines a value with its timestamp and associated record headers.
+ *
+ * @param <V> the value type
+ */
+public final class ValueTimestampHeaders<V> {
+
+    private final V value;
+    private final long timestamp;
+    private final Headers headers;

Review Comment:
   To implement lazy deserialization as discussed on the KIP, it seems we need 
one more member `byte[] rawHeaders`?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersDeserializerTest.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Iterator;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+public class HeadersDeserializerTest {
+
+    private final HeadersSerializer serializer = new HeadersSerializer();
+    private final HeadersDeserializer deserializer = new HeadersDeserializer();
+
+    @Test
+    public void shouldDeserializeNullData() {
+        final Headers headers = deserializer.deserialize(null);
+
+        assertNotNull(headers);
+        assertEquals(0, headers.toArray().length);
+    }
+
+    @Test
+    public void shouldDeserializeEmptyData() {
+        final Headers headers = deserializer.deserialize(new byte[0]);
+
+        assertNotNull(headers);
+        assertEquals(0, headers.toArray().length);
+    }
+
+    @Test
+    public void shouldRoundTripEmptyHeaders() {
+        final Headers original = new RecordHeaders();
+        final byte[] serialized = serializer.serialize(original);
+        final Headers deserialized = deserializer.deserialize(serialized);
+
+        assertNotNull(deserialized);
+        assertEquals(0, deserialized.toArray().length);
+    }
+
+    @Test
+    public void shouldRoundTripSingleHeader() {
+        final Headers original = new RecordHeaders()
+            .add("key1", "value1".getBytes());
+        final byte[] serialized = serializer.serialize(original);
+        final Headers deserialized = deserializer.deserialize(serialized);
+
+        assertNotNull(deserialized);
+        assertEquals(1, deserialized.toArray().length);
+
+        final Header header = deserialized.lastHeader("key1");
+        assertNotNull(header);
+        assertEquals("key1", header.key());
+        assertArrayEquals("value1".getBytes(), header.value());
+    }
+
+    @Test
+    public void shouldRoundTripMultipleHeaders() {
+        final Headers original = new RecordHeaders()
+            .add("key1", "value1".getBytes())
+            .add("key2", "value2".getBytes())
+            .add("key3", "value3".getBytes());
+        final byte[] serialized = serializer.serialize(original);
+        final Headers deserialized = deserializer.deserialize(serialized);
+
+        assertNotNull(deserialized);
+        assertEquals(3, deserialized.toArray().length);
+
+        assertEquals("key1", deserialized.lastHeader("key1").key());
+        assertArrayEquals("value1".getBytes(), 
deserialized.lastHeader("key1").value());
+
+        assertEquals("key2", deserialized.lastHeader("key2").key());
+        assertArrayEquals("value2".getBytes(), 
deserialized.lastHeader("key2").value());
+
+        assertEquals("key3", deserialized.lastHeader("key3").key());
+        assertArrayEquals("value3".getBytes(), 
deserialized.lastHeader("key3").value());
+    }
+
+    @Test
+    public void shouldRoundTripHeaderWithNullValue() {
+        final Headers original = new RecordHeaders()
+            .add("key1", null);
+        final byte[] serialized = serializer.serialize(original);
+        final Headers deserialized = deserializer.deserialize(serialized);
+
+        assertNotNull(deserialized);
+        assertEquals(1, deserialized.toArray().length);
+
+        final Header header = deserialized.lastHeader("key1");
+        assertNotNull(header);
+        assertEquals("key1", header.key());
+        assertNull(header.value());
+    }
+
+    @Test
+    public void shouldRoundTripHeaderWithEmptyValue() {
+        final Headers original = new RecordHeaders()
+            .add("key1", new byte[0]);
+        final byte[] serialized = serializer.serialize(original);
+        final Headers deserialized = deserializer.deserialize(serialized);
+
+        assertNotNull(deserialized);
+        assertEquals(1, deserialized.toArray().length);
+
+        final Header header = deserialized.lastHeader("key1");
+        assertNotNull(header);
+        assertEquals("key1", header.key());
+        assertArrayEquals(new byte[0], header.value());
+    }
+
+    @Test
+    public void shouldPreserveHeaderOrder() {

Review Comment:
   Should we merge this test with the above `shouldRoundTripMultipleHeaders`?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+
+/**
+ * Serializer for Kafka Headers.
+ *
+ * Serialization format (per KIP-1271):
+ * [NumHeaders(varint)][Header1][Header2]...
+ *
+ * Each header:
+ * [KeyLength(varint)][KeyBytes(UTF-8)][ValueLength(varint)][ValueBytes]
+ *
+ * Note: ValueLength is -1 for null values (encoded as varint).
+ * All integers are encoded as varints (signed varint encoding).
+ *
+ * This serializer produces the headersBytes portion. The headersSize prefix
+ * is added by the outer serializer (e.g., ValueTimestampHeadersSerializer).
+ *
+ * This is used by KIP-1271 to serialize headers for storage in state stores.
+ */
+public class HeadersSerializer {
+
+    /**
+     * Serializes headers into a byte array using varint encoding per KIP-1271.
+     * <p>
+     * The output format is [count][header1][header2]... without a size prefix.
+     * The size prefix is added by the outer serializer that uses this.
+     *
+     * @param headers the headers to serialize (can be null)
+     * @return the serialized byte array
+     */
+    public byte[] serialize(final Headers headers) {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             DataOutputStream out = new DataOutputStream(baos)) {
+
+            if (headers == null) {
+                // Empty headers: just [NumHeaders(varint) = 0]
+                ByteUtils.writeVarint(0, out);
+                return baos.toByteArray();
+            }
+
+            // Count headers
+            int headerCount = 0;
+            final Iterator<Header> iterator = headers.iterator();
+            while (iterator.hasNext()) {
+                iterator.next();
+                headerCount++;
+            }
+
+            // Write header count as varint

Review Comment:
   Avoid unnecessary comments. The line below is explaining itself. -- Comment 
should be use to explain _why_ we do something. The code itself tells us, what 
we do. No need to repeat what we do in a comment.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableSerializer;
+
+/**
+ * Serializer for ValueTimestampHeaders.
+ *
+ * Serialization format (per KIP-1271):
+ * [headersSize(varint)][headersBytes][timestamp(8)][value]
+ *
+ * Where:
+ * - headersSize: Size of the headersBytes section in bytes, encoded as varint
+ * - headersBytes: Serialized headers ([count(varint)][header1][header2]...) 
from HeadersSerializer
+ * - timestamp: 8-byte long timestamp
+ * - value: Serialized value using the provided value serializer
+ *
+ * This is used by KIP-1271 to serialize values with timestamps and headers 
for state stores.
+ */
+public class ValueTimestampHeadersSerializer<V> implements 
WrappingNullableSerializer<ValueTimestampHeaders<V>, Void, V> {
+    public final Serializer<V> valueSerializer;
+    private final Serializer<Long> timestampSerializer;
+    private final HeadersSerializer headersSerializer;
+
+    ValueTimestampHeadersSerializer(final Serializer<V> valueSerializer) {
+        Objects.requireNonNull(valueSerializer);
+        this.valueSerializer = valueSerializer;
+        this.timestampSerializer = new LongSerializer();
+        this.headersSerializer = new HeadersSerializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueSerializer.configure(configs, isKey);
+        timestampSerializer.configure(configs, isKey);
+    }

Review Comment:
   configure header serializer



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Deserializer for Kafka Headers.
+ *
+ * Deserialization format (per KIP-1271):
+ * [NumHeaders(varint)][Header1][Header2]...
+ *
+ * Each header:
+ * [KeyLength(varint)][KeyBytes(UTF-8)][ValueLength(varint)][ValueBytes]
+ *
+ * Note: ValueLength is -1 for null values (encoded as varint).
+ * All integers are decoded from varints (signed varint encoding).
+ *
+ * This deserializer expects the headersBytes portion without a size prefix.
+ * The size prefix is handled by the outer deserializer (e.g., 
ValueTimestampHeadersDeserializer).
+ *
+ * This is used by KIP-1271 to deserialize headers from state stores.
+ */
+public class HeadersDeserializer {
+
+    /**
+     * Deserializes headers from a byte array using varint encoding per 
KIP-1271.
+     * <p>
+     * The input format is [count][header1][header2]... without a size prefix.
+     *
+     * @param data the serialized byte array (can be null)
+     * @return the deserialized headers
+     */
+    public Headers deserialize(final byte[] data) {
+        if (data == null || data.length == 0) {
+            return new RecordHeaders();
+        }
+
+        final ByteBuffer buffer = ByteBuffer.wrap(data);
+
+        // Read header count as varint

Review Comment:
   nit: unnecessary comment (doesn't add anything)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableDeserializer;
+
+/**
+ * Deserializer for ValueTimestampHeaders.
+ *
+ * Deserialization format (per KIP-1271):
+ * [headersSize(varint)][headersBytes][timestamp(8)][value]
+ *
+ * Where:
+ * - headersSize: Size of the headersBytes section in bytes, encoded as varint
+ * - headersBytes: Serialized headers ([count(varint)][header1][header2]...) 
to be deserialized by HeadersDeserializer
+ * - timestamp: 8-byte long timestamp
+ * - value: Serialized value to be deserialized with the provided value 
deserializer
+ *
+ * This is used by KIP-1271 to deserialize values with timestamps and headers 
from state stores.
+ */
+class ValueTimestampHeadersDeserializer<V> implements 
WrappingNullableDeserializer<ValueTimestampHeaders<V>, Void, V> {
+    private static final LongDeserializer LONG_DESERIALIZER = new 
LongDeserializer();
+
+    public final Deserializer<V> valueDeserializer;
+    private final Deserializer<Long> timestampDeserializer;
+    private final HeadersDeserializer headersDeserializer;
+
+    ValueTimestampHeadersDeserializer(final Deserializer<V> valueDeserializer) 
{
+        Objects.requireNonNull(valueDeserializer);
+        this.valueDeserializer = valueDeserializer;
+        this.timestampDeserializer = new LongDeserializer();
+        this.headersDeserializer = new HeadersDeserializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueDeserializer.configure(configs, isKey);
+        timestampDeserializer.configure(configs, isKey);
+    }
+
+    @Override
+    public ValueTimestampHeaders<V> deserialize(final String topic, final 
byte[] valueTimestampHeaders) {
+        if (valueTimestampHeaders == null) {
+            return null;
+        }
+
+        final ByteBuffer buffer = ByteBuffer.wrap(valueTimestampHeaders);
+
+        // Read header size as varint

Review Comment:
   nit: remove



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class HeadersSerializerTest {
+
+    private final HeadersSerializer serializer = new HeadersSerializer();
+
+    @Test
+    public void shouldSerializeNullHeaders() {
+        final byte[] serialized = serializer.serialize(null);
+
+        assertNotNull(serialized);
+        assertTrue(serialized.length > 0);
+        assertEquals(1, serialized.length, "Empty header should have 1 byte to 
indicate headers count is 0");
+    }
+
+    @Test
+    public void shouldSerializeEmptyHeaders() {
+        final Headers headers = new RecordHeaders();
+        final byte[] serialized = serializer.serialize(headers);
+
+        assertNotNull(serialized);
+        assertTrue(serialized.length > 0);
+        assertEquals(1, serialized.length, "Empty header should have 1 byte to 
indicate headers count is 0");

Review Comment:
   As above



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableDeserializer;
+
+/**
+ * Deserializer for ValueTimestampHeaders.
+ *
+ * Deserialization format (per KIP-1271):
+ * [headersSize(varint)][headersBytes][timestamp(8)][value]
+ *
+ * Where:
+ * - headersSize: Size of the headersBytes section in bytes, encoded as varint
+ * - headersBytes: Serialized headers ([count(varint)][header1][header2]...) 
to be deserialized by HeadersDeserializer
+ * - timestamp: 8-byte long timestamp
+ * - value: Serialized value to be deserialized with the provided value 
deserializer
+ *
+ * This is used by KIP-1271 to deserialize values with timestamps and headers 
from state stores.
+ */
+class ValueTimestampHeadersDeserializer<V> implements 
WrappingNullableDeserializer<ValueTimestampHeaders<V>, Void, V> {
+    private static final LongDeserializer LONG_DESERIALIZER = new 
LongDeserializer();
+
+    public final Deserializer<V> valueDeserializer;
+    private final Deserializer<Long> timestampDeserializer;
+    private final HeadersDeserializer headersDeserializer;
+
+    ValueTimestampHeadersDeserializer(final Deserializer<V> valueDeserializer) 
{
+        Objects.requireNonNull(valueDeserializer);
+        this.valueDeserializer = valueDeserializer;
+        this.timestampDeserializer = new LongDeserializer();
+        this.headersDeserializer = new HeadersDeserializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueDeserializer.configure(configs, isKey);
+        timestampDeserializer.configure(configs, isKey);
+    }

Review Comment:
   Should we also call `headersDeserializer.configure(....)` even if it's 
currently empty to prevent future bugs, in case we need to add code there as 
some point in the future?



##########
streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * Marker interface to indicate that a bytes store understands the 
value-with-headers format
+ * and can convert legacy "plain value" entries to the new format.
+ * <p>
+ * This is analogous to {@link TimestampedBytesStore} but for header-aware 
stores.
+ * <p>
+ * Per KIP-1271, the value format is: 
[headersSize(varint)][headersBytes][payloadBytes]
+ * where payloadBytes is the existing serialized value (e.g., 
[timestamp(8)][value] for timestamped stores).
+ */
+public interface HeadersBytesStore {
+
+    /**
+     * Converts a legacy value (without headers) to the header-embedded format.
+     * <p>
+     * For timestamped stores, the legacy format is: [timestamp(8)][value]
+     * The new format is: 
[headersSize(varint)][headersBytes][timestamp(8)][value]
+     * <p>
+     * This method adds empty headers to the existing value format.
+     * <p>
+     * The headersBytes format per KIP-1271 is: 
[count(varint)][header1][header2]...
+     * For empty headers, this is simply a single varint encoding 0.
+     *
+     * @param key   the key bytes (may be used for context-dependent 
conversion; typically unused)

Review Comment:
   Not sure why we add the key, if we don't use it?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableSerializer;
+
+/**
+ * Serializer for ValueTimestampHeaders.
+ *
+ * Serialization format (per KIP-1271):
+ * [headersSize(varint)][headersBytes][timestamp(8)][value]
+ *
+ * Where:
+ * - headersSize: Size of the headersBytes section in bytes, encoded as varint
+ * - headersBytes: Serialized headers ([count(varint)][header1][header2]...) 
from HeadersSerializer
+ * - timestamp: 8-byte long timestamp
+ * - value: Serialized value using the provided value serializer
+ *
+ * This is used by KIP-1271 to serialize values with timestamps and headers 
for state stores.
+ */
+public class ValueTimestampHeadersSerializer<V> implements 
WrappingNullableSerializer<ValueTimestampHeaders<V>, Void, V> {
+    public final Serializer<V> valueSerializer;
+    private final Serializer<Long> timestampSerializer;
+    private final HeadersSerializer headersSerializer;
+
+    ValueTimestampHeadersSerializer(final Serializer<V> valueSerializer) {
+        Objects.requireNonNull(valueSerializer);
+        this.valueSerializer = valueSerializer;
+        this.timestampSerializer = new LongSerializer();
+        this.headersSerializer = new HeadersSerializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueSerializer.configure(configs, isKey);
+        timestampSerializer.configure(configs, isKey);
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final ValueTimestampHeaders<V> 
data) {
+        if (data == null) {
+            return null;
+        }
+        return serialize(topic, data.value(), data.timestamp(), 
data.headers());
+    }
+
+    public byte[] serialize(final String topic, final V data, final long 
timestamp, final Headers headers) {
+        if (data == null) {
+            return null;
+        }
+
+        final byte[] rawValue = valueSerializer.serialize(topic, headers, 
data);
+
+        // Since we can't control the result of the internal serializer, we 
make sure that the result
+        // is not null as well.
+        // Serializing non-null values to null can be useful when working with 
Optional-like values
+        // where the Optional.empty case is serialized to null.
+        if (rawValue == null) {
+            return null;
+        }
+
+        final byte[] rawHeaders = headersSerializer.serialize(headers);  // 
[count][header1][header2]...
+        final byte[] rawTimestamp = timestampSerializer.serialize(topic, 
timestamp);
+
+        // Format: [headersSize(varint)][headersBytes][timestamp(8)][value]
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             DataOutputStream out = new DataOutputStream(baos)) {
+
+            ByteUtils.writeVarint(rawHeaders.length, out);  // headersSize
+            out.write(rawHeaders);                           // 
[count][header1][header2]...
+            out.write(rawTimestamp);                         // [timestamp(8)]
+            out.write(rawValue);                             // [value]
+
+            return baos.toByteArray();
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to serialize 
ValueTimestampHeaders", e);
+        }
+    }
+
+    @Override
+    public void close() {
+        valueSerializer.close();
+        timestampSerializer.close();
+    }
+
+    @Override
+    public void setIfUnset(final SerdeGetter getter) {
+        // ValueTimestampHeadersSerializer never wraps a null serializer (or 
configure would throw),
+        // but it may wrap a serializer that itself wraps a null serializer.
+        initNullableSerializer(valueSerializer, getter);
+    }
+
+    /**
+     * Compares two serialized records (produced by this serializer) and 
returns true iff:
+     * - the underlying value bytes and headers are identical, and
+     * - the new timestamp is strictly greater than the old timestamp.
+     * <p>
+     * This method is used for optimization: if values and headers haven't 
changed and time
+     * is not increasing, we can skip the update.
+     *
+     * @param oldRecord the old serialized record
+     * @param newRecord the new serialized record
+     * @return true if values/headers are same and time is increasing
+     */
+    public static boolean valuesAndHeadersAreSameAndTimeIsIncreasing(
+        final byte[] oldRecord,
+        final byte[] newRecord
+    ) {
+        if (oldRecord == newRecord) {
+            // Same reference, trivially the same (might both be null)
+            return true;
+        } else if (oldRecord == null || newRecord == null) {
+            // Only one is null, cannot be the same
+            return false;
+        } else if (newRecord.length != oldRecord.length) {
+            // Different length, cannot be the same
+            return false;
+        } else if (timeIsDecreasing(oldRecord, newRecord)) {
+            // Time moved backwards, need to update regardless of value changes
+            return false;
+        } else {
+            // All other checks passed, compare binary data
+            return valuesAndHeadersAreSame(oldRecord, newRecord);
+        }
+    }
+
+    /**
+     * Checks if timestamp in newRecord is less than or equal to timestamp in 
oldRecord.
+     */
+    private static boolean timeIsDecreasing(final byte[] oldRecord, final 
byte[] newRecord) {

Review Comment:
   as above



##########
streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * Marker interface to indicate that a bytes store understands the 
value-with-headers format
+ * and can convert legacy "plain value" entries to the new format.
+ * <p>
+ * This is analogous to {@link TimestampedBytesStore} but for header-aware 
stores.
+ * <p>
+ * Per KIP-1271, the value format is: 
[headersSize(varint)][headersBytes][payloadBytes]
+ * where payloadBytes is the existing serialized value (e.g., 
[timestamp(8)][value] for timestamped stores).
+ */
+public interface HeadersBytesStore {
+
+    /**
+     * Converts a legacy value (without headers) to the header-embedded format.

Review Comment:
   It was an open question on the KIP if we support upgrading from plain KV to 
header format, too.
   
   In any case, might be good to clarify what "legacy" format means?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableDeserializer;
+
+/**
+ * Deserializer for ValueTimestampHeaders.
+ *
+ * Deserialization format (per KIP-1271):
+ * [headersSize(varint)][headersBytes][timestamp(8)][value]
+ *
+ * Where:
+ * - headersSize: Size of the headersBytes section in bytes, encoded as varint
+ * - headersBytes: Serialized headers ([count(varint)][header1][header2]...) 
to be deserialized by HeadersDeserializer
+ * - timestamp: 8-byte long timestamp
+ * - value: Serialized value to be deserialized with the provided value 
deserializer
+ *
+ * This is used by KIP-1271 to deserialize values with timestamps and headers 
from state stores.
+ */
+class ValueTimestampHeadersDeserializer<V> implements 
WrappingNullableDeserializer<ValueTimestampHeaders<V>, Void, V> {
+    private static final LongDeserializer LONG_DESERIALIZER = new 
LongDeserializer();
+
+    public final Deserializer<V> valueDeserializer;
+    private final Deserializer<Long> timestampDeserializer;
+    private final HeadersDeserializer headersDeserializer;
+
+    ValueTimestampHeadersDeserializer(final Deserializer<V> valueDeserializer) 
{
+        Objects.requireNonNull(valueDeserializer);
+        this.valueDeserializer = valueDeserializer;
+        this.timestampDeserializer = new LongDeserializer();
+        this.headersDeserializer = new HeadersDeserializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueDeserializer.configure(configs, isKey);
+        timestampDeserializer.configure(configs, isKey);
+    }
+
+    @Override
+    public ValueTimestampHeaders<V> deserialize(final String topic, final 
byte[] valueTimestampHeaders) {
+        if (valueTimestampHeaders == null) {
+            return null;
+        }
+
+        final ByteBuffer buffer = ByteBuffer.wrap(valueTimestampHeaders);
+
+        // Read header size as varint
+        final int headerSize = ByteUtils.readVarint(buffer);
+
+        // Read headers
+        final byte[] rawHeaders = new byte[headerSize];
+        buffer.get(rawHeaders);
+        final Headers headers = headersDeserializer.deserialize(rawHeaders);

Review Comment:
   If we do lazy deserialization as discussed on the KIP, we should not do it 
here -- would be eager at this point.



##########
streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * Marker interface to indicate that a bytes store understands the 
value-with-headers format
+ * and can convert legacy "plain value" entries to the new format.
+ * <p>
+ * This is analogous to {@link TimestampedBytesStore} but for header-aware 
stores.
+ * <p>
+ * Per KIP-1271, the value format is: 
[headersSize(varint)][headersBytes][payloadBytes]
+ * where payloadBytes is the existing serialized value (e.g., 
[timestamp(8)][value] for timestamped stores).
+ */
+public interface HeadersBytesStore {
+
+    /**
+     * Converts a legacy value (without headers) to the header-embedded format.
+     * <p>
+     * For timestamped stores, the legacy format is: [timestamp(8)][value]
+     * The new format is: 
[headersSize(varint)][headersBytes][timestamp(8)][value]
+     * <p>
+     * This method adds empty headers to the existing value format.
+     * <p>
+     * The headersBytes format per KIP-1271 is: 
[count(varint)][header1][header2]...
+     * For empty headers, this is simply a single varint encoding 0.
+     *
+     * @param key   the key bytes (may be used for context-dependent 
conversion; typically unused)
+     * @param value the legacy value bytes (for timestamped stores: 
[timestamp(8)][value])
+     * @return the value in header-embedded format with empty headers
+     */
+    static byte[] convertToHeaderFormat(final byte[] key, final byte[] value) {

Review Comment:
   If we only support upgrading from timestamped stores, might be good to 
rename `value` to `valueAndTimestamp`?
   
   We are dealing with many formats, do it's useful to express the expected 
format for `byte[]` arrays.



##########
streams/src/main/java/org/apache/kafka/streams/state/StateSerdes.java:
##########
@@ -153,6 +155,16 @@ public K keyFrom(final byte[] rawKey) {
         return keySerde.deserializer().deserialize(topic, rawKey);
     }
 
+    /**
+     * Deserialize the key from raw bytes.
+     *
+     * @param rawKey  the key as raw bytes
+     * @return        the key as typed object
+     */
+    public K keyFrom(final byte[] rawKey, final Headers headers) {

Review Comment:
   Seems we are missing deprecation annotations for existing methods?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableDeserializer;
+
+/**
+ * Deserializer for ValueTimestampHeaders.
+ *
+ * Deserialization format (per KIP-1271):
+ * [headersSize(varint)][headersBytes][timestamp(8)][value]
+ *
+ * Where:
+ * - headersSize: Size of the headersBytes section in bytes, encoded as varint
+ * - headersBytes: Serialized headers ([count(varint)][header1][header2]...) 
to be deserialized by HeadersDeserializer
+ * - timestamp: 8-byte long timestamp
+ * - value: Serialized value to be deserialized with the provided value 
deserializer
+ *
+ * This is used by KIP-1271 to deserialize values with timestamps and headers 
from state stores.
+ */
+class ValueTimestampHeadersDeserializer<V> implements 
WrappingNullableDeserializer<ValueTimestampHeaders<V>, Void, V> {
+    private static final LongDeserializer LONG_DESERIALIZER = new 
LongDeserializer();
+
+    public final Deserializer<V> valueDeserializer;
+    private final Deserializer<Long> timestampDeserializer;
+    private final HeadersDeserializer headersDeserializer;
+
+    ValueTimestampHeadersDeserializer(final Deserializer<V> valueDeserializer) 
{
+        Objects.requireNonNull(valueDeserializer);
+        this.valueDeserializer = valueDeserializer;
+        this.timestampDeserializer = new LongDeserializer();
+        this.headersDeserializer = new HeadersDeserializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueDeserializer.configure(configs, isKey);
+        timestampDeserializer.configure(configs, isKey);
+    }
+
+    @Override
+    public ValueTimestampHeaders<V> deserialize(final String topic, final 
byte[] valueTimestampHeaders) {
+        if (valueTimestampHeaders == null) {
+            return null;
+        }
+
+        final ByteBuffer buffer = ByteBuffer.wrap(valueTimestampHeaders);
+
+        // Read header size as varint
+        final int headerSize = ByteUtils.readVarint(buffer);
+
+        // Read headers
+        final byte[] rawHeaders = new byte[headerSize];
+        buffer.get(rawHeaders);
+        final Headers headers = headersDeserializer.deserialize(rawHeaders);
+
+        // Read timestamp (8 bytes)
+        final byte[] rawTimestamp = new byte[Long.BYTES];
+        buffer.get(rawTimestamp);
+        final long timestamp = timestampDeserializer.deserialize(topic, 
rawTimestamp);
+
+        // Read value (remaining bytes)
+        final byte[] rawValue = new byte[buffer.remaining()];
+        buffer.get(rawValue);
+        final V value = valueDeserializer.deserialize(topic, headers, 
rawValue);
+
+        return ValueTimestampHeaders.make(value, timestamp, headers);
+    }
+
+    @Override
+    public void close() {
+        valueDeserializer.close();
+        timestampDeserializer.close();
+    }

Review Comment:
   We should also close the header deserializer.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+
+/**
+ * Serializer for Kafka Headers.
+ *
+ * Serialization format (per KIP-1271):
+ * [NumHeaders(varint)][Header1][Header2]...
+ *
+ * Each header:
+ * [KeyLength(varint)][KeyBytes(UTF-8)][ValueLength(varint)][ValueBytes]
+ *
+ * Note: ValueLength is -1 for null values (encoded as varint).
+ * All integers are encoded as varints (signed varint encoding).
+ *
+ * This serializer produces the headersBytes portion. The headersSize prefix
+ * is added by the outer serializer (e.g., ValueTimestampHeadersSerializer).
+ *
+ * This is used by KIP-1271 to serialize headers for storage in state stores.
+ */
+public class HeadersSerializer {
+
+    /**
+     * Serializes headers into a byte array using varint encoding per KIP-1271.
+     * <p>
+     * The output format is [count][header1][header2]... without a size prefix.
+     * The size prefix is added by the outer serializer that uses this.
+     *
+     * @param headers the headers to serialize (can be null)
+     * @return the serialized byte array
+     */
+    public byte[] serialize(final Headers headers) {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             DataOutputStream out = new DataOutputStream(baos)) {
+
+            if (headers == null) {

Review Comment:
   I think we should also cover the zero headers case? (cf me comment below -- 
if we use `.size` we could use it here, and unify both case?)



##########
streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * Marker interface to indicate that a bytes store understands the 
value-with-headers format
+ * and can convert legacy "plain value" entries to the new format.
+ * <p>
+ * This is analogous to {@link TimestampedBytesStore} but for header-aware 
stores.
+ * <p>
+ * Per KIP-1271, the value format is: 
[headersSize(varint)][headersBytes][payloadBytes]
+ * where payloadBytes is the existing serialized value (e.g., 
[timestamp(8)][value] for timestamped stores).
+ */
+public interface HeadersBytesStore {
+
+    /**
+     * Converts a legacy value (without headers) to the header-embedded format.
+     * <p>
+     * For timestamped stores, the legacy format is: [timestamp(8)][value]
+     * The new format is: 
[headersSize(varint)][headersBytes][timestamp(8)][value]
+     * <p>
+     * This method adds empty headers to the existing value format.
+     * <p>
+     * The headersBytes format per KIP-1271 is: 
[count(varint)][header1][header2]...
+     * For empty headers, this is simply a single varint encoding 0.
+     *
+     * @param key   the key bytes (may be used for context-dependent 
conversion; typically unused)
+     * @param value the legacy value bytes (for timestamped stores: 
[timestamp(8)][value])
+     * @return the value in header-embedded format with empty headers
+     */
+    static byte[] convertToHeaderFormat(final byte[] key, final byte[] value) {
+        if (value == null) {
+            return null;
+        }
+
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();

Review Comment:
   should be `final` -- seems this applies elsewhere on the PR -- please fix 
everywhere



##########
streams/src/main/java/org/apache/kafka/streams/state/ValueTimestampHeaders.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * Combines a value with its timestamp and associated record headers.
+ *
+ * @param <V> the value type
+ */
+public final class ValueTimestampHeaders<V> {
+
+    private final V value;
+    private final long timestamp;
+    private final Headers headers;
+
+    private ValueTimestampHeaders(final V value, final long timestamp, final 
Headers headers) {
+        this.value = value;
+        this.timestamp = timestamp;
+        this.headers = headers == null ? new RecordHeaders() : headers;
+    }
+
+    /**
+     * Create a new {@link ValueTimestampHeaders} instance if the provided 
{@code value} is not {@code null}.
+     *
+     * @param value     the value
+     * @param timestamp the timestamp
+     * @param headers   the headers (may be {@code null}, treated as empty)
+     * @param <V>       the type of the value
+     * @return a new {@link ValueTimestampHeaders} instance if the provided 
{@code value} is not {@code null};
+     * otherwise {@code null} is returned
+     */
+    public static <V> ValueTimestampHeaders<V> make(final V value,
+                                                    final long timestamp,
+                                                    final Headers headers) {
+        if (value == null) {
+            return null;
+        }
+        return new ValueTimestampHeaders<>(value, timestamp, headers);
+    }
+
+    /**
+     * Create a new {@link ValueTimestampHeaders} instance.
+     * The provided {@code value} may be {@code null}.
+     *
+     * @param value     the value (may be {@code null})
+     * @param timestamp the timestamp
+     * @param headers   the headers (may be {@code null}, treated as empty)
+     * @param <V>       the type of the value
+     * @return a new {@link ValueTimestampHeaders} instance
+     */
+    public static <V> ValueTimestampHeaders<V> makeAllowNullable(final V value,
+                                                                 final long 
timestamp,
+                                                                 final Headers 
headers) {
+        return new ValueTimestampHeaders<>(value, timestamp, headers);
+    }
+
+    /**
+     * Return the wrapped {@code value} of the given {@code 
valueTimestampHeaders} parameter
+     * if the parameter is not {@code null}.
+     *
+     * @param valueTimestampHeaders a {@link ValueTimestampHeaders} instance; 
can be {@code null}
+     * @param <V>                   the type of the value
+     * @return the wrapped {@code value} of {@code valueTimestampHeaders} if 
not {@code null}; otherwise {@code null}
+     */
+    public static <V> V getValueOrNull(final ValueTimestampHeaders<V> 
valueTimestampHeaders) {
+        return valueTimestampHeaders == null ? null : 
valueTimestampHeaders.value;
+    }
+
+    public V value() {
+        return value;
+    }
+
+    public long timestamp() {
+        return timestamp;
+    }
+
+    public Headers headers() {
+        return headers;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (!(o instanceof ValueTimestampHeaders)) {
+            return false;
+        }
+        final ValueTimestampHeaders<?> that = (ValueTimestampHeaders<?>) o;
+        // Headers does not implement deep equals; compare serialized forms or 
iterate.
+        return timestamp == that.timestamp
+            && Objects.equals(value, that.value)
+            && headersEqual(headers, that.headers);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Objects.hash(value, timestamp);
+        result = 31 * result + headersHash(headers);

Review Comment:
   Why do we need to introduce this helper? Shouldn't it be the responsibility 
of a `Headers` implementation to provide a proper `hashCode()` method -- 
`RecordHeaders` (and also `RecordHeader`) both do.



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/HeadersSerializerTest.java:
##########
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class HeadersSerializerTest {
+
+    private final HeadersSerializer serializer = new HeadersSerializer();
+
+    @Test
+    public void shouldSerializeNullHeaders() {
+        final byte[] serialized = serializer.serialize(null);
+
+        assertNotNull(serialized);
+        assertTrue(serialized.length > 0);
+        assertEquals(1, serialized.length, "Empty header should have 1 byte to 
indicate headers count is 0");
+    }
+
+    @Test
+    public void shouldSerializeEmptyHeaders() {
+        final Headers headers = new RecordHeaders();
+        final byte[] serialized = serializer.serialize(headers);
+
+        assertNotNull(serialized);
+        assertTrue(serialized.length > 0);
+        assertEquals(1, serialized.length, "Empty header should have 1 byte to 
indicate headers count is 0");
+    }
+
+    @Test
+    public void shouldSerializeSingleHeader() {
+        final Headers headers = new RecordHeaders()
+            .add("key1", "value1".getBytes());
+        final byte[] serialized = serializer.serialize(headers);
+
+        assertNotNull(serialized);
+        assertTrue(serialized.length > 0);

Review Comment:
   These test is somewhat weak. Should we deserialize instead for verification? 
(Same below) 



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java:
##########
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableDeserializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableDeserializer;
+
+/**
+ * Deserializer for ValueTimestampHeaders.
+ *
+ * Deserialization format (per KIP-1271):
+ * [headersSize(varint)][headersBytes][timestamp(8)][value]
+ *
+ * Where:
+ * - headersSize: Size of the headersBytes section in bytes, encoded as varint
+ * - headersBytes: Serialized headers ([count(varint)][header1][header2]...) 
to be deserialized by HeadersDeserializer
+ * - timestamp: 8-byte long timestamp
+ * - value: Serialized value to be deserialized with the provided value 
deserializer
+ *
+ * This is used by KIP-1271 to deserialize values with timestamps and headers 
from state stores.
+ */
+class ValueTimestampHeadersDeserializer<V> implements 
WrappingNullableDeserializer<ValueTimestampHeaders<V>, Void, V> {
+    private static final LongDeserializer LONG_DESERIALIZER = new 
LongDeserializer();
+
+    public final Deserializer<V> valueDeserializer;
+    private final Deserializer<Long> timestampDeserializer;
+    private final HeadersDeserializer headersDeserializer;
+
+    ValueTimestampHeadersDeserializer(final Deserializer<V> valueDeserializer) 
{
+        Objects.requireNonNull(valueDeserializer);
+        this.valueDeserializer = valueDeserializer;
+        this.timestampDeserializer = new LongDeserializer();
+        this.headersDeserializer = new HeadersDeserializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueDeserializer.configure(configs, isKey);
+        timestampDeserializer.configure(configs, isKey);
+    }
+
+    @Override
+    public ValueTimestampHeaders<V> deserialize(final String topic, final 
byte[] valueTimestampHeaders) {
+        if (valueTimestampHeaders == null) {
+            return null;
+        }
+
+        final ByteBuffer buffer = ByteBuffer.wrap(valueTimestampHeaders);
+
+        // Read header size as varint
+        final int headerSize = ByteUtils.readVarint(buffer);
+
+        // Read headers
+        final byte[] rawHeaders = new byte[headerSize];
+        buffer.get(rawHeaders);
+        final Headers headers = headersDeserializer.deserialize(rawHeaders);
+
+        // Read timestamp (8 bytes)
+        final byte[] rawTimestamp = new byte[Long.BYTES];
+        buffer.get(rawTimestamp);
+        final long timestamp = timestampDeserializer.deserialize(topic, 
rawTimestamp);
+
+        // Read value (remaining bytes)
+        final byte[] rawValue = new byte[buffer.remaining()];
+        buffer.get(rawValue);
+        final V value = valueDeserializer.deserialize(topic, headers, 
rawValue);
+
+        return ValueTimestampHeaders.make(value, timestamp, headers);

Review Comment:
   Seems we might need a change to the KIP to allow for lazy deserialization?
   
   \cc @aliehsaeedii 



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableSerializer;
+
+/**
+ * Serializer for ValueTimestampHeaders.
+ *
+ * Serialization format (per KIP-1271):
+ * [headersSize(varint)][headersBytes][timestamp(8)][value]
+ *
+ * Where:
+ * - headersSize: Size of the headersBytes section in bytes, encoded as varint
+ * - headersBytes: Serialized headers ([count(varint)][header1][header2]...) 
from HeadersSerializer
+ * - timestamp: 8-byte long timestamp
+ * - value: Serialized value using the provided value serializer
+ *
+ * This is used by KIP-1271 to serialize values with timestamps and headers 
for state stores.
+ */
+public class ValueTimestampHeadersSerializer<V> implements 
WrappingNullableSerializer<ValueTimestampHeaders<V>, Void, V> {
+    public final Serializer<V> valueSerializer;
+    private final Serializer<Long> timestampSerializer;
+    private final HeadersSerializer headersSerializer;
+
+    ValueTimestampHeadersSerializer(final Serializer<V> valueSerializer) {
+        Objects.requireNonNull(valueSerializer);
+        this.valueSerializer = valueSerializer;
+        this.timestampSerializer = new LongSerializer();
+        this.headersSerializer = new HeadersSerializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueSerializer.configure(configs, isKey);
+        timestampSerializer.configure(configs, isKey);
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final ValueTimestampHeaders<V> 
data) {
+        if (data == null) {
+            return null;
+        }
+        return serialize(topic, data.value(), data.timestamp(), 
data.headers());
+    }
+
+    public byte[] serialize(final String topic, final V data, final long 
timestamp, final Headers headers) {
+        if (data == null) {
+            return null;
+        }
+
+        final byte[] rawValue = valueSerializer.serialize(topic, headers, 
data);
+
+        // Since we can't control the result of the internal serializer, we 
make sure that the result
+        // is not null as well.
+        // Serializing non-null values to null can be useful when working with 
Optional-like values
+        // where the Optional.empty case is serialized to null.
+        if (rawValue == null) {
+            return null;
+        }
+
+        final byte[] rawHeaders = headersSerializer.serialize(headers);  // 
[count][header1][header2]...
+        final byte[] rawTimestamp = timestampSerializer.serialize(topic, 
timestamp);
+
+        // Format: [headersSize(varint)][headersBytes][timestamp(8)][value]
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             DataOutputStream out = new DataOutputStream(baos)) {
+
+            ByteUtils.writeVarint(rawHeaders.length, out);  // headersSize
+            out.write(rawHeaders);                           // 
[count][header1][header2]...
+            out.write(rawTimestamp);                         // [timestamp(8)]
+            out.write(rawValue);                             // [value]
+
+            return baos.toByteArray();
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to serialize 
ValueTimestampHeaders", e);
+        }
+    }
+
+    @Override
+    public void close() {
+        valueSerializer.close();
+        timestampSerializer.close();
+    }
+
+    @Override
+    public void setIfUnset(final SerdeGetter getter) {
+        // ValueTimestampHeadersSerializer never wraps a null serializer (or 
configure would throw),
+        // but it may wrap a serializer that itself wraps a null serializer.
+        initNullableSerializer(valueSerializer, getter);
+    }
+
+    /**
+     * Compares two serialized records (produced by this serializer) and 
returns true iff:
+     * - the underlying value bytes and headers are identical, and
+     * - the new timestamp is strictly greater than the old timestamp.
+     * <p>
+     * This method is used for optimization: if values and headers haven't 
changed and time
+     * is not increasing, we can skip the update.
+     *
+     * @param oldRecord the old serialized record
+     * @param newRecord the new serialized record
+     * @return true if values/headers are same and time is increasing
+     */
+    public static boolean valuesAndHeadersAreSameAndTimeIsIncreasing(
+        final byte[] oldRecord,
+        final byte[] newRecord
+    ) {
+        if (oldRecord == newRecord) {
+            // Same reference, trivially the same (might both be null)
+            return true;
+        } else if (oldRecord == null || newRecord == null) {
+            // Only one is null, cannot be the same
+            return false;
+        } else if (newRecord.length != oldRecord.length) {
+            // Different length, cannot be the same
+            return false;
+        } else if (timeIsDecreasing(oldRecord, newRecord)) {
+            // Time moved backwards, need to update regardless of value changes
+            return false;
+        } else {
+            // All other checks passed, compare binary data
+            return valuesAndHeadersAreSame(oldRecord, newRecord);
+        }
+    }
+
+    /**
+     * Checks if timestamp in newRecord is less than or equal to timestamp in 
oldRecord.
+     */
+    private static boolean timeIsDecreasing(final byte[] oldRecord, final 
byte[] newRecord) {
+        return extractTimestamp(newRecord) <= extractTimestamp(oldRecord);
+    }
+
+    /**
+     * Extracts the timestamp from a serialized record.
+     * Format: [headersSize][headersBytes][timestamp(8)][value]
+     */
+    private static long extractTimestamp(final byte[] bytes) {

Review Comment:
   same



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableSerializer;
+
+/**
+ * Serializer for ValueTimestampHeaders.
+ *
+ * Serialization format (per KIP-1271):
+ * [headersSize(varint)][headersBytes][timestamp(8)][value]
+ *
+ * Where:
+ * - headersSize: Size of the headersBytes section in bytes, encoded as varint
+ * - headersBytes: Serialized headers ([count(varint)][header1][header2]...) 
from HeadersSerializer
+ * - timestamp: 8-byte long timestamp
+ * - value: Serialized value using the provided value serializer
+ *
+ * This is used by KIP-1271 to serialize values with timestamps and headers 
for state stores.
+ */
+public class ValueTimestampHeadersSerializer<V> implements 
WrappingNullableSerializer<ValueTimestampHeaders<V>, Void, V> {
+    public final Serializer<V> valueSerializer;
+    private final Serializer<Long> timestampSerializer;
+    private final HeadersSerializer headersSerializer;
+
+    ValueTimestampHeadersSerializer(final Serializer<V> valueSerializer) {
+        Objects.requireNonNull(valueSerializer);
+        this.valueSerializer = valueSerializer;
+        this.timestampSerializer = new LongSerializer();
+        this.headersSerializer = new HeadersSerializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueSerializer.configure(configs, isKey);
+        timestampSerializer.configure(configs, isKey);
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final ValueTimestampHeaders<V> 
data) {
+        if (data == null) {
+            return null;
+        }
+        return serialize(topic, data.value(), data.timestamp(), 
data.headers());
+    }
+
+    public byte[] serialize(final String topic, final V data, final long 
timestamp, final Headers headers) {
+        if (data == null) {
+            return null;
+        }
+
+        final byte[] rawValue = valueSerializer.serialize(topic, headers, 
data);
+
+        // Since we can't control the result of the internal serializer, we 
make sure that the result
+        // is not null as well.
+        // Serializing non-null values to null can be useful when working with 
Optional-like values
+        // where the Optional.empty case is serialized to null.
+        if (rawValue == null) {
+            return null;
+        }
+
+        final byte[] rawHeaders = headersSerializer.serialize(headers);  // 
[count][header1][header2]...
+        final byte[] rawTimestamp = timestampSerializer.serialize(topic, 
timestamp);
+
+        // Format: [headersSize(varint)][headersBytes][timestamp(8)][value]
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             DataOutputStream out = new DataOutputStream(baos)) {
+
+            ByteUtils.writeVarint(rawHeaders.length, out);  // headersSize
+            out.write(rawHeaders);                           // 
[count][header1][header2]...
+            out.write(rawTimestamp);                         // [timestamp(8)]
+            out.write(rawValue);                             // [value]
+
+            return baos.toByteArray();
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to serialize 
ValueTimestampHeaders", e);
+        }
+    }
+
+    @Override
+    public void close() {
+        valueSerializer.close();
+        timestampSerializer.close();
+    }
+
+    @Override
+    public void setIfUnset(final SerdeGetter getter) {
+        // ValueTimestampHeadersSerializer never wraps a null serializer (or 
configure would throw),
+        // but it may wrap a serializer that itself wraps a null serializer.
+        initNullableSerializer(valueSerializer, getter);
+    }
+
+    /**
+     * Compares two serialized records (produced by this serializer) and 
returns true iff:
+     * - the underlying value bytes and headers are identical, and
+     * - the new timestamp is strictly greater than the old timestamp.
+     * <p>
+     * This method is used for optimization: if values and headers haven't 
changed and time
+     * is not increasing, we can skip the update.
+     *
+     * @param oldRecord the old serialized record
+     * @param newRecord the new serialized record
+     * @return true if values/headers are same and time is increasing
+     */
+    public static boolean valuesAndHeadersAreSameAndTimeIsIncreasing(

Review Comment:
   Do we need this? Seems to be an artifact of KIP-557 which we never completed?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableSerializer;
+
+/**
+ * Serializer for ValueTimestampHeaders.
+ *
+ * Serialization format (per KIP-1271):
+ * [headersSize(varint)][headersBytes][timestamp(8)][value]
+ *
+ * Where:
+ * - headersSize: Size of the headersBytes section in bytes, encoded as varint
+ * - headersBytes: Serialized headers ([count(varint)][header1][header2]...) 
from HeadersSerializer
+ * - timestamp: 8-byte long timestamp
+ * - value: Serialized value using the provided value serializer
+ *
+ * This is used by KIP-1271 to serialize values with timestamps and headers 
for state stores.
+ */
+public class ValueTimestampHeadersSerializer<V> implements 
WrappingNullableSerializer<ValueTimestampHeaders<V>, Void, V> {
+    public final Serializer<V> valueSerializer;
+    private final Serializer<Long> timestampSerializer;
+    private final HeadersSerializer headersSerializer;
+
+    ValueTimestampHeadersSerializer(final Serializer<V> valueSerializer) {
+        Objects.requireNonNull(valueSerializer);
+        this.valueSerializer = valueSerializer;
+        this.timestampSerializer = new LongSerializer();
+        this.headersSerializer = new HeadersSerializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueSerializer.configure(configs, isKey);
+        timestampSerializer.configure(configs, isKey);
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final ValueTimestampHeaders<V> 
data) {
+        if (data == null) {
+            return null;
+        }
+        return serialize(topic, data.value(), data.timestamp(), 
data.headers());
+    }
+
+    public byte[] serialize(final String topic, final V data, final long 
timestamp, final Headers headers) {
+        if (data == null) {
+            return null;
+        }
+
+        final byte[] rawValue = valueSerializer.serialize(topic, headers, 
data);
+
+        // Since we can't control the result of the internal serializer, we 
make sure that the result
+        // is not null as well.
+        // Serializing non-null values to null can be useful when working with 
Optional-like values
+        // where the Optional.empty case is serialized to null.
+        if (rawValue == null) {
+            return null;
+        }
+
+        final byte[] rawHeaders = headersSerializer.serialize(headers);  // 
[count][header1][header2]...
+        final byte[] rawTimestamp = timestampSerializer.serialize(topic, 
timestamp);
+
+        // Format: [headersSize(varint)][headersBytes][timestamp(8)][value]
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             DataOutputStream out = new DataOutputStream(baos)) {
+
+            ByteUtils.writeVarint(rawHeaders.length, out);  // headersSize
+            out.write(rawHeaders);                           // 
[count][header1][header2]...
+            out.write(rawTimestamp);                         // [timestamp(8)]
+            out.write(rawValue);                             // [value]
+
+            return baos.toByteArray();
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to serialize 
ValueTimestampHeaders", e);
+        }
+    }
+
+    @Override
+    public void close() {
+        valueSerializer.close();
+        timestampSerializer.close();
+    }
+
+    @Override
+    public void setIfUnset(final SerdeGetter getter) {
+        // ValueTimestampHeadersSerializer never wraps a null serializer (or 
configure would throw),
+        // but it may wrap a serializer that itself wraps a null serializer.
+        initNullableSerializer(valueSerializer, getter);
+    }
+
+    /**
+     * Compares two serialized records (produced by this serializer) and 
returns true iff:
+     * - the underlying value bytes and headers are identical, and
+     * - the new timestamp is strictly greater than the old timestamp.
+     * <p>
+     * This method is used for optimization: if values and headers haven't 
changed and time
+     * is not increasing, we can skip the update.
+     *
+     * @param oldRecord the old serialized record
+     * @param newRecord the new serialized record
+     * @return true if values/headers are same and time is increasing
+     */
+    public static boolean valuesAndHeadersAreSameAndTimeIsIncreasing(
+        final byte[] oldRecord,
+        final byte[] newRecord
+    ) {
+        if (oldRecord == newRecord) {
+            // Same reference, trivially the same (might both be null)
+            return true;
+        } else if (oldRecord == null || newRecord == null) {
+            // Only one is null, cannot be the same
+            return false;
+        } else if (newRecord.length != oldRecord.length) {
+            // Different length, cannot be the same
+            return false;
+        } else if (timeIsDecreasing(oldRecord, newRecord)) {
+            // Time moved backwards, need to update regardless of value changes
+            return false;
+        } else {
+            // All other checks passed, compare binary data
+            return valuesAndHeadersAreSame(oldRecord, newRecord);
+        }
+    }
+
+    /**
+     * Checks if timestamp in newRecord is less than or equal to timestamp in 
oldRecord.
+     */
+    private static boolean timeIsDecreasing(final byte[] oldRecord, final 
byte[] newRecord) {
+        return extractTimestamp(newRecord) <= extractTimestamp(oldRecord);
+    }
+
+    /**
+     * Extracts the timestamp from a serialized record.
+     * Format: [headersSize][headersBytes][timestamp(8)][value]
+     */
+    private static long extractTimestamp(final byte[] bytes) {
+        final ByteBuffer buffer = ByteBuffer.wrap(bytes);
+
+        // Skip headersSize and headersBytes
+        final int headersSize = ByteUtils.readVarint(buffer);
+        buffer.position(buffer.position() + headersSize);
+
+        // Read timestamp (8 bytes)
+        return buffer.getLong();
+    }
+
+    /**
+     * Checks if values and headers are the same in two serialized records.
+     * Compares headers section and value section, skipping the timestamp.
+     */
+    private static boolean valuesAndHeadersAreSame(final byte[] left, final 
byte[] right) {

Review Comment:
   as above



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableSerializer;
+
+/**
+ * Serializer for ValueTimestampHeaders.
+ *
+ * Serialization format (per KIP-1271):
+ * [headersSize(varint)][headersBytes][timestamp(8)][value]
+ *
+ * Where:
+ * - headersSize: Size of the headersBytes section in bytes, encoded as varint
+ * - headersBytes: Serialized headers ([count(varint)][header1][header2]...) 
from HeadersSerializer
+ * - timestamp: 8-byte long timestamp
+ * - value: Serialized value using the provided value serializer
+ *
+ * This is used by KIP-1271 to serialize values with timestamps and headers 
for state stores.
+ */
+public class ValueTimestampHeadersSerializer<V> implements 
WrappingNullableSerializer<ValueTimestampHeaders<V>, Void, V> {
+    public final Serializer<V> valueSerializer;
+    private final Serializer<Long> timestampSerializer;
+    private final HeadersSerializer headersSerializer;
+
+    ValueTimestampHeadersSerializer(final Serializer<V> valueSerializer) {
+        Objects.requireNonNull(valueSerializer);
+        this.valueSerializer = valueSerializer;
+        this.timestampSerializer = new LongSerializer();
+        this.headersSerializer = new HeadersSerializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueSerializer.configure(configs, isKey);
+        timestampSerializer.configure(configs, isKey);
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final ValueTimestampHeaders<V> 
data) {
+        if (data == null) {
+            return null;
+        }
+        return serialize(topic, data.value(), data.timestamp(), 
data.headers());
+    }
+
+    public byte[] serialize(final String topic, final V data, final long 
timestamp, final Headers headers) {
+        if (data == null) {
+            return null;
+        }
+
+        final byte[] rawValue = valueSerializer.serialize(topic, headers, 
data);
+
+        // Since we can't control the result of the internal serializer, we 
make sure that the result
+        // is not null as well.
+        // Serializing non-null values to null can be useful when working with 
Optional-like values
+        // where the Optional.empty case is serialized to null.
+        if (rawValue == null) {

Review Comment:
   I was just comparing to `ValueAndTimestampSerializer` -- it also contains a 
link to a discussion in the comment -- would be good to add here, too



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableSerializer;
+
+/**
+ * Serializer for ValueTimestampHeaders.
+ *
+ * Serialization format (per KIP-1271):
+ * [headersSize(varint)][headersBytes][timestamp(8)][value]
+ *
+ * Where:
+ * - headersSize: Size of the headersBytes section in bytes, encoded as varint
+ * - headersBytes: Serialized headers ([count(varint)][header1][header2]...) 
from HeadersSerializer
+ * - timestamp: 8-byte long timestamp
+ * - value: Serialized value using the provided value serializer
+ *
+ * This is used by KIP-1271 to serialize values with timestamps and headers 
for state stores.
+ */
+public class ValueTimestampHeadersSerializer<V> implements 
WrappingNullableSerializer<ValueTimestampHeaders<V>, Void, V> {
+    public final Serializer<V> valueSerializer;
+    private final Serializer<Long> timestampSerializer;
+    private final HeadersSerializer headersSerializer;
+
+    ValueTimestampHeadersSerializer(final Serializer<V> valueSerializer) {
+        Objects.requireNonNull(valueSerializer);
+        this.valueSerializer = valueSerializer;
+        this.timestampSerializer = new LongSerializer();
+        this.headersSerializer = new HeadersSerializer();
+    }
+
+    @Override
+    public void configure(final Map<String, ?> configs, final boolean isKey) {
+        valueSerializer.configure(configs, isKey);
+        timestampSerializer.configure(configs, isKey);
+    }
+
+    @Override
+    public byte[] serialize(final String topic, final ValueTimestampHeaders<V> 
data) {
+        if (data == null) {
+            return null;
+        }
+        return serialize(topic, data.value(), data.timestamp(), 
data.headers());
+    }
+
+    public byte[] serialize(final String topic, final V data, final long 
timestamp, final Headers headers) {
+        if (data == null) {
+            return null;
+        }
+
+        final byte[] rawValue = valueSerializer.serialize(topic, headers, 
data);
+
+        // Since we can't control the result of the internal serializer, we 
make sure that the result
+        // is not null as well.
+        // Serializing non-null values to null can be useful when working with 
Optional-like values
+        // where the Optional.empty case is serialized to null.
+        if (rawValue == null) {
+            return null;
+        }
+
+        final byte[] rawHeaders = headersSerializer.serialize(headers);  // 
[count][header1][header2]...
+        final byte[] rawTimestamp = timestampSerializer.serialize(topic, 
timestamp);
+
+        // Format: [headersSize(varint)][headersBytes][timestamp(8)][value]
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             DataOutputStream out = new DataOutputStream(baos)) {
+
+            ByteUtils.writeVarint(rawHeaders.length, out);  // headersSize
+            out.write(rawHeaders);                           // 
[count][header1][header2]...
+            out.write(rawTimestamp);                         // [timestamp(8)]
+            out.write(rawValue);                             // [value]
+
+            return baos.toByteArray();
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to serialize 
ValueTimestampHeaders", e);
+        }
+    }
+
+    @Override
+    public void close() {
+        valueSerializer.close();
+        timestampSerializer.close();
+    }

Review Comment:
   close headers serializer



##########
streams/src/main/java/org/apache/kafka/streams/state/ValueTimestampHeaders.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+
+import java.util.Arrays;
+import java.util.Objects;
+
+/**
+ * Combines a value with its timestamp and associated record headers.
+ *
+ * @param <V> the value type
+ */
+public final class ValueTimestampHeaders<V> {
+
+    private final V value;
+    private final long timestamp;
+    private final Headers headers;
+
+    private ValueTimestampHeaders(final V value, final long timestamp, final 
Headers headers) {
+        this.value = value;
+        this.timestamp = timestamp;
+        this.headers = headers == null ? new RecordHeaders() : headers;
+    }
+
+    /**
+     * Create a new {@link ValueTimestampHeaders} instance if the provided 
{@code value} is not {@code null}.
+     *
+     * @param value     the value
+     * @param timestamp the timestamp
+     * @param headers   the headers (may be {@code null}, treated as empty)
+     * @param <V>       the type of the value
+     * @return a new {@link ValueTimestampHeaders} instance if the provided 
{@code value} is not {@code null};
+     * otherwise {@code null} is returned
+     */
+    public static <V> ValueTimestampHeaders<V> make(final V value,
+                                                    final long timestamp,
+                                                    final Headers headers) {
+        if (value == null) {
+            return null;
+        }
+        return new ValueTimestampHeaders<>(value, timestamp, headers);
+    }
+
+    /**
+     * Create a new {@link ValueTimestampHeaders} instance.
+     * The provided {@code value} may be {@code null}.
+     *
+     * @param value     the value (may be {@code null})
+     * @param timestamp the timestamp
+     * @param headers   the headers (may be {@code null}, treated as empty)
+     * @param <V>       the type of the value
+     * @return a new {@link ValueTimestampHeaders} instance
+     */
+    public static <V> ValueTimestampHeaders<V> makeAllowNullable(final V value,
+                                                                 final long 
timestamp,
+                                                                 final Headers 
headers) {
+        return new ValueTimestampHeaders<>(value, timestamp, headers);
+    }
+
+    /**
+     * Return the wrapped {@code value} of the given {@code 
valueTimestampHeaders} parameter
+     * if the parameter is not {@code null}.
+     *
+     * @param valueTimestampHeaders a {@link ValueTimestampHeaders} instance; 
can be {@code null}
+     * @param <V>                   the type of the value
+     * @return the wrapped {@code value} of {@code valueTimestampHeaders} if 
not {@code null}; otherwise {@code null}
+     */
+    public static <V> V getValueOrNull(final ValueTimestampHeaders<V> 
valueTimestampHeaders) {
+        return valueTimestampHeaders == null ? null : 
valueTimestampHeaders.value;
+    }
+
+    public V value() {
+        return value;
+    }
+
+    public long timestamp() {
+        return timestamp;
+    }
+
+    public Headers headers() {
+        return headers;

Review Comment:
   The lazy deserialization would need to happen here?



##########
streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * Marker interface to indicate that a bytes store understands the 
value-with-headers format
+ * and can convert legacy "plain value" entries to the new format.

Review Comment:
   Cf comment below -- not sure if "plain value" is correct? We might only 
support upgrading from `ValueAndTimestamp` byte format to the new header byte 
format



##########
streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * Marker interface to indicate that a bytes store understands the 
value-with-headers format
+ * and can convert legacy "plain value" entries to the new format.
+ * <p>
+ * This is analogous to {@link TimestampedBytesStore} but for header-aware 
stores.
+ * <p>
+ * Per KIP-1271, the value format is: 
[headersSize(varint)][headersBytes][payloadBytes]
+ * where payloadBytes is the existing serialized value (e.g., 
[timestamp(8)][value] for timestamped stores).
+ */
+public interface HeadersBytesStore {
+
+    /**
+     * Converts a legacy value (without headers) to the header-embedded format.
+     * <p>
+     * For timestamped stores, the legacy format is: [timestamp(8)][value]
+     * The new format is: 
[headersSize(varint)][headersBytes][timestamp(8)][value]
+     * <p>
+     * This method adds empty headers to the existing value format.
+     * <p>
+     * The headersBytes format per KIP-1271 is: 
[count(varint)][header1][header2]...
+     * For empty headers, this is simply a single varint encoding 0.
+     *
+     * @param key   the key bytes (may be used for context-dependent 
conversion; typically unused)
+     * @param value the legacy value bytes (for timestamped stores: 
[timestamp(8)][value])
+     * @return the value in header-embedded format with empty headers
+     */
+    static byte[] convertToHeaderFormat(final byte[] key, final byte[] value) {
+        if (value == null) {
+            return null;
+        }
+
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();

Review Comment:
   The below implementation seems to be unnecessary complex? We know that we 
insert empty header so it seems simpler to just do:
   ```
   return ByteBuffer
               .allocate(1 + value.length)
               .putByte(0) // this should be varint encoding for zero?
               .put(value)
               .array();
   ```



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Iterator;
+
+/**
+ * Serializer for Kafka Headers.
+ *
+ * Serialization format (per KIP-1271):
+ * [NumHeaders(varint)][Header1][Header2]...
+ *
+ * Each header:
+ * [KeyLength(varint)][KeyBytes(UTF-8)][ValueLength(varint)][ValueBytes]
+ *
+ * Note: ValueLength is -1 for null values (encoded as varint).
+ * All integers are encoded as varints (signed varint encoding).
+ *
+ * This serializer produces the headersBytes portion. The headersSize prefix
+ * is added by the outer serializer (e.g., ValueTimestampHeadersSerializer).
+ *
+ * This is used by KIP-1271 to serialize headers for storage in state stores.
+ */
+public class HeadersSerializer {
+
+    /**
+     * Serializes headers into a byte array using varint encoding per KIP-1271.
+     * <p>
+     * The output format is [count][header1][header2]... without a size prefix.
+     * The size prefix is added by the outer serializer that uses this.
+     *
+     * @param headers the headers to serialize (can be null)
+     * @return the serialized byte array
+     */
+    public byte[] serialize(final Headers headers) {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();

Review Comment:
   Should be `final` -- similar elsewhere



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to