mjsax commented on code in PR #21345:
URL: https://github.com/apache/kafka/pull/21345#discussion_r2761697267


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+/*
+ * Serializer for org.apache.kafka.common.header.Headers using the KIP‑1271 
format:
+ *
+ * headers_bytes :=
+ *   varint header_count
+ *   repeated header_count times:
+ *     varint key_length
+ *     key_length bytes of UTF‑8 key
+ *     varint value_length   // -1 means null
+ *     value_length bytes of value (if value_length >= 0)
+ *
+ * Note: "varint" here is a signed int encoded with zigzag + LEB128.
+ */
+public final class HeadersSerializer implements Serializer<Headers> {
+
+  @Override
+  public void configure(final Map<String, ?> configs, final boolean isKey) {
+    // no-op
+  }
+
+  @Override
+  public byte[] serialize(final String topic, final Headers headers) {
+    final ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+    if (headers == null || !headers.iterator().hasNext()) {
+      // Empty headers: return [varint(0)] instead of []
+      byte[] headersLength = getLengthAsVarint(new byte[0]);
+      return headersLength;
+    }
+
+    // 1) header_count
+    final int headerCount = count(headers);
+    writeVarint(headerCount, out);
+
+    // 2) each header in iteration order (order is preserved)
+    for (final Header header : headers) {
+      final byte[] keyBytes = header.key().getBytes(StandardCharsets.UTF_8);
+      final byte[] valueBytes = header.value();
+
+      // key length + key bytes
+      writeVarint(keyBytes.length, out);
+      out.writeBytes(keyBytes);
+
+      // value length (-1 for null) + value bytes
+      if (valueBytes == null) {
+        writeVarint(-1, out);
+      } else {
+        writeVarint(valueBytes.length, out);
+        out.writeBytes(valueBytes);
+      }
+    }
+
+    byte[] headersBytes = out.toByteArray();
+    byte[] headersLength = getLengthAsVarint(headersBytes);
+    return ByteBuffer.allocate(headersLength.length + headersBytes.length )
+        .put(headersLength)
+        .put(headersBytes)
+        .array();
+  }
+
+  @Override
+  public void close() {
+    // no-op
+  }
+
+  // ----------------------------------------------------------------------
+  // Varint encoding (signed int via zigzag + LEB128)
+  // ----------------------------------------------------------------------
+
+  private static void writeVarint(final int value, final ByteArrayOutputStream 
out) {
+    // Zigzag transform: map signed int -> unsigned so that small negative
+    // and small positive values both get small encodings.
+    int v = (value << 1) ^ (value >> 31);
+
+    while ((v & ~0x7F) != 0) {
+      out.write((v & 0x7F) | 0x80);
+      v >>>= 7;
+    }
+    out.write(v & 0x7F);
+  }
+
+  private static int count(final Headers headers) {
+    int c = 0;
+    for (final Header ignored : headers) {
+      c++;
+    }
+    return c;
+  }
+
+  private byte[] getLengthAsVarint(byte[] data) {
+    if (data == null) {
+      return encodeVarint(-1); // Handle null as -1
+    }
+    return encodeVarint(data.length);
+  }
+
+  /**
+   * Encodes an integer into an unsigned varint (LEB128).
+   */
+  private byte[] encodeVarint(int value) {

Review Comment:
   Same



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+/*
+ * Serializer for org.apache.kafka.common.header.Headers using the KIP‑1271 
format:
+ *
+ * headers_bytes :=
+ *   varint header_count
+ *   repeated header_count times:
+ *     varint key_length
+ *     key_length bytes of UTF‑8 key
+ *     varint value_length   // -1 means null
+ *     value_length bytes of value (if value_length >= 0)
+ *
+ * Note: "varint" here is a signed int encoded with zigzag + LEB128.
+ */
+public final class HeadersSerializer implements Serializer<Headers> {
+
+  @Override
+  public void configure(final Map<String, ?> configs, final boolean isKey) {
+    // no-op
+  }
+
+  @Override
+  public byte[] serialize(final String topic, final Headers headers) {
+    final ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+    if (headers == null || !headers.iterator().hasNext()) {
+      // Empty headers: return [varint(0)] instead of []
+      byte[] headersLength = getLengthAsVarint(new byte[0]);
+      return headersLength;
+    }
+
+    // 1) header_count
+    final int headerCount = count(headers);

Review Comment:
   Should we use `headers.toArray().size()` instead of adding `count` ?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersSerializer.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+/*
+ * Serializer for org.apache.kafka.common.header.Headers using the KIP‑1271 
format:
+ *
+ * headers_bytes :=
+ *   varint header_count
+ *   repeated header_count times:
+ *     varint key_length
+ *     key_length bytes of UTF‑8 key
+ *     varint value_length   // -1 means null
+ *     value_length bytes of value (if value_length >= 0)
+ *
+ * Note: "varint" here is a signed int encoded with zigzag + LEB128.
+ */
+public final class HeadersSerializer implements Serializer<Headers> {
+
+  @Override
+  public void configure(final Map<String, ?> configs, final boolean isKey) {
+    // no-op
+  }
+
+  @Override
+  public byte[] serialize(final String topic, final Headers headers) {
+    final ByteArrayOutputStream out = new ByteArrayOutputStream();
+
+    if (headers == null || !headers.iterator().hasNext()) {
+      // Empty headers: return [varint(0)] instead of []
+      byte[] headersLength = getLengthAsVarint(new byte[0]);
+      return headersLength;
+    }
+
+    // 1) header_count
+    final int headerCount = count(headers);
+    writeVarint(headerCount, out);
+
+    // 2) each header in iteration order (order is preserved)
+    for (final Header header : headers) {
+      final byte[] keyBytes = header.key().getBytes(StandardCharsets.UTF_8);
+      final byte[] valueBytes = header.value();
+
+      // key length + key bytes
+      writeVarint(keyBytes.length, out);
+      out.writeBytes(keyBytes);
+
+      // value length (-1 for null) + value bytes
+      if (valueBytes == null) {
+        writeVarint(-1, out);
+      } else {
+        writeVarint(valueBytes.length, out);
+        out.writeBytes(valueBytes);
+      }
+    }
+
+    byte[] headersBytes = out.toByteArray();
+    byte[] headersLength = getLengthAsVarint(headersBytes);
+    return ByteBuffer.allocate(headersLength.length + headersBytes.length )
+        .put(headersLength)
+        .put(headersBytes)
+        .array();
+  }
+
+  @Override
+  public void close() {
+    // no-op
+  }
+
+  // ----------------------------------------------------------------------
+  // Varint encoding (signed int via zigzag + LEB128)
+  // ----------------------------------------------------------------------
+
+  private static void writeVarint(final int value, final ByteArrayOutputStream 
out) {

Review Comment:
   Similar question: there should hopefully be existing code we can reuse?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+
+import java.util.Objects;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+/**
+ * A Metered {@link TimestampedKeyValueStoreWithHeaders} wrapper that is used 
for recording operation metrics, and hence
+ * its inner KeyValueStore implementation does not need to provide its own 
metrics collecting functionality.
+ *
+ * The inner {@link KeyValueStore} of this class is of type &lt;Bytes, 
byte[]&gt;,
+ * hence we use {@link Serde}s to convert from &lt;K, 
ValueTimestampHeaders&lt;V&gt;&gt; to &lt;Bytes, byte[]&gt;.
+ *
+ * @param <K> key type
+ * @param <V> value type (wrapped in {@link ValueTimestampHeaders})
+ */
+public class MeteredTimestampedKeyValueStoreWithHeaders<K, V>
+    extends MeteredKeyValueStore<K, ValueTimestampHeaders<V>>
+    implements TimestampedKeyValueStoreWithHeaders<K, V> {
+
+  MeteredTimestampedKeyValueStoreWithHeaders(final KeyValueStore<Bytes, 
byte[]> inner,
+                                             final String metricScope,
+                                             final Time time,
+                                             final Serde<K> keySerde,
+                                             final 
Serde<ValueTimestampHeaders<V>> valueSerde) {
+    super(inner, metricScope, time, keySerde, valueSerde);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  protected Serde<ValueTimestampHeaders<V>> prepareValueSerdeForStore(final 
Serde<ValueTimestampHeaders<V>> valueSerde,
+                                                                      final 
SerdeGetter getter) {
+    if (valueSerde == null) {
+      return new ValueTimestampHeadersSerde<>((Serde<V>) getter.valueSerde());
+    } else {
+      return super.prepareValueSerdeForStore(valueSerde, getter);
+    }
+  }
+
+  @Override
+  public ValueTimestampHeaders<V> get(final K key) {
+    Objects.requireNonNull(key, "key cannot be null");
+    try {
+      return maybeMeasureLatency(() -> 
outerValue(wrapped().get(keyBytes(key))), time, getSensor);
+    } catch (final ProcessorStateException e) {
+      final String message = String.format(e.getMessage(), key);
+      throw new ProcessorStateException(message, e);
+    }
+  }
+
+  protected ValueTimestampHeaders<V> outerValue(final byte[] value) {
+    Headers headers =
+        
HeadersDeserializer.deserialize(ValueTimestampHeadersDeserializer.rawHeaders(value));
+    return value != null ? serdes.valueFrom(value, headers) : null;
+  }
+
+  /**
+   * Returns both the raw serialized bytes and the deserialized {@link 
ValueTimestampHeaders}.
+   */
+  public RawAndDeserializedValue<V> getWithBinary(final K key, final Headers 
headers) {

Review Comment:
   I think we don't need to add this one -- it's an artifact of reverted 
KIP-557 and unused.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+import java.util.List;
+
+import static 
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.rawValue;
+import static 
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.timestamp;
+
+/**
+ * Change-logging wrapper for a timestamped key-value bytes store whose values 
also carry headers.
+ * <p>
+ * the header-aware serialized value format produced by {@link 
ValueTimestampHeadersSerializer}.
+ * <p>
+ * Semantics:
+ *  - The inner store value format is:
+ *        [ varint header_length ][ header_bytes ][ 8-byte timestamp ][ 
value_bytes ]
+ *  - The changelog record value logged via {@code log(...)} remains just 
{@code value_bytes}
+ *    (no timestamp, no headers), and the timestamp is logged separately.
+ */
+public class ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders
+    extends ChangeLoggingKeyValueBytesStore {
+
+  ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders(final 
KeyValueStore<Bytes, byte[]> inner) {
+    super(inner);
+  }
+
+  @Override
+  public void put(final Bytes key,
+                  final byte[] valueTimestampHeaders) {
+    wrapped().put(key, valueTimestampHeaders);
+    log(

Review Comment:
   Seem we need to update `log(...)` to also pass in the header now? Otherwise, 
we would lose it when writing into the changelog topic?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java:
##########
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.state.HeadersBytesStore;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatchInterface;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Optional;
+
+import static 
org.apache.kafka.streams.state.HeadersBytesStore.convertToHeaderFormat;
+
+/**
+ * A persistent key-(value-timestamp-headers) store based on RocksDB.
+ *
+ * This is analogous to {@link RocksDBTimestampedStore}, but the "new" column 
family stores
+ * a header-aware format. Legacy values (without headers) are converted on the 
fly using
+ * {@link HeadersBytesStore#convertToHeaderFormat(byte[], byte[])}.
+ */
+public class RocksDBTimestampedStoreWithHeaders extends RocksDBStore 
implements HeadersBytesStore {
+
+  private static final Logger log = 
LoggerFactory.getLogger(RocksDBTimestampedStoreWithHeaders.class);
+
+  // Legacy column family name - must match 
RocksDBTimestampedStore.TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME
+  private static final byte[] LEGACY_TIMESTAMPED_CF_NAME =
+      "keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8);
+
+  // New column family for header-aware timestamped values.
+  private static final byte[] TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME =
+      "keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8);
+
+  public RocksDBTimestampedStoreWithHeaders(final String name,
+                                            final String metricsScope) {
+    super(name, metricsScope);
+  }
+
+  RocksDBTimestampedStoreWithHeaders(final String name,
+                                     final String parentDir,
+                                     final RocksDBMetricsRecorder 
metricsRecorder) {
+    super(name, parentDir, metricsRecorder);
+  }
+
+  @Override
+  void openRocksDB(final DBOptions dbOptions,
+                   final ColumnFamilyOptions columnFamilyOptions) {
+    // We open three CFs:
+    //  - DEFAULT_COLUMN_FAMILY: required by RocksDB (not used)
+    //  - LEGACY_TIMESTAMPED_CF_NAME: legacy timestamped values (without 
headers)
+    //  - TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME: new header-aware format
+    //
+    // On first open with no legacy data, we just use the new CF.
+    final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
+        dbOptions,
+        new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
+        new ColumnFamilyDescriptor(LEGACY_TIMESTAMPED_CF_NAME, 
columnFamilyOptions),
+        new ColumnFamilyDescriptor(TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME, 
columnFamilyOptions)
+    );
+
+    final ColumnFamilyHandle defaultCf = columnFamilies.get(0);
+    final ColumnFamilyHandle legacyCf = columnFamilies.get(1);
+    final ColumnFamilyHandle headersCf = columnFamilies.get(2);
+
+    // Close the default CF as we don't use it
+    defaultCf.close();
+
+    final RocksIterator legacyIter = db.newIterator(legacyCf);
+    legacyIter.seekToFirst();
+    if (legacyIter.isValid()) {
+      log.info("Opening store {} in upgrade mode (legacy timestamped -> 
header-aware timestamped)", name);
+      cfAccessor = new DualColumnFamilyAccessor(legacyCf, headersCf);
+    } else {
+      log.info("Opening store {} in regular header-aware mode", name);
+      cfAccessor = new SingleColumnFamilyAccessor(headersCf);
+      legacyCf.close();
+    }
+    legacyIter.close();
+  }
+
+  /**
+   * Accessor that supports dual-column-family upgrade: legacy CF (timestamped 
without headers)
+   * and new CF (timestamped with headers).
+   */
+  private class DualColumnFamilyAccessor implements ColumnFamilyAccessor {

Review Comment:
   I am wondering to what extend this code is duplicate to the existing one? 
Would be good to avoid a second copy, but make it more generic to be used for 
both cases?
   
   We might just need to have same pluggable "converter" function with point to 
either the new `convertToHeaderFormat` or the existing 
`convertToTimestampedFormat` ?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersSerializer.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.kstream.internals.WrappingNullableSerializer;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+import java.io.ByteArrayInputStream;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Objects;
+
+import static 
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.initNullableSerializer;
+
+/**
+ * Serializer for {@link ValueTimestampHeaders}, analogous to {@link 
ValueAndTimestampSerializer}.
+ */
+public class ValueTimestampHeadersSerializer<V>
+    implements WrappingNullableSerializer<ValueTimestampHeaders<V>, Void, V> {
+
+  public final Serializer<V> valueSerializer;
+  private final Serializer<Long> timestampSerializer;
+  public final HeadersSerializer headersSerializer;
+
+  ValueTimestampHeadersSerializer(final Serializer<V> valueSerializer) {
+    Objects.requireNonNull(valueSerializer);
+    this.valueSerializer = valueSerializer;
+    this.timestampSerializer = new LongSerializer();
+    this.headersSerializer = new HeadersSerializer();
+  }
+
+  /**
+   * Compare two serialized records (produced by this serializer) and return 
true iff:
+   *  - the underlying value bytes are identical, and
+   *  - the new timestamp is strictly greater than the old timestamp.
+   * <p>
+   */
+  public static boolean valuesAndHeadersAreSameAndTimeIsIncreasing(final 
byte[] oldRecord,

Review Comment:
   We might not need this (seems to be KIP-557 related)?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+
+import java.util.Objects;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+/**
+ * A Metered {@link TimestampedKeyValueStoreWithHeaders} wrapper that is used 
for recording operation metrics, and hence
+ * its inner KeyValueStore implementation does not need to provide its own 
metrics collecting functionality.
+ *
+ * The inner {@link KeyValueStore} of this class is of type &lt;Bytes, 
byte[]&gt;,
+ * hence we use {@link Serde}s to convert from &lt;K, 
ValueTimestampHeaders&lt;V&gt;&gt; to &lt;Bytes, byte[]&gt;.
+ *
+ * @param <K> key type
+ * @param <V> value type (wrapped in {@link ValueTimestampHeaders})
+ */
+public class MeteredTimestampedKeyValueStoreWithHeaders<K, V>
+    extends MeteredKeyValueStore<K, ValueTimestampHeaders<V>>
+    implements TimestampedKeyValueStoreWithHeaders<K, V> {
+
+  MeteredTimestampedKeyValueStoreWithHeaders(final KeyValueStore<Bytes, 
byte[]> inner,
+                                             final String metricScope,
+                                             final Time time,
+                                             final Serde<K> keySerde,
+                                             final 
Serde<ValueTimestampHeaders<V>> valueSerde) {
+    super(inner, metricScope, time, keySerde, valueSerde);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  protected Serde<ValueTimestampHeaders<V>> prepareValueSerdeForStore(final 
Serde<ValueTimestampHeaders<V>> valueSerde,
+                                                                      final 
SerdeGetter getter) {
+    if (valueSerde == null) {
+      return new ValueTimestampHeadersSerde<>((Serde<V>) getter.valueSerde());
+    } else {
+      return super.prepareValueSerdeForStore(valueSerde, getter);
+    }
+  }
+
+  @Override
+  public ValueTimestampHeaders<V> get(final K key) {
+    Objects.requireNonNull(key, "key cannot be null");
+    try {
+      return maybeMeasureLatency(() -> 
outerValue(wrapped().get(keyBytes(key))), time, getSensor);
+    } catch (final ProcessorStateException e) {
+      final String message = String.format(e.getMessage(), key);
+      throw new ProcessorStateException(message, e);
+    }
+  }
+
+  protected ValueTimestampHeaders<V> outerValue(final byte[] value) {
+    Headers headers =
+        
HeadersDeserializer.deserialize(ValueTimestampHeadersDeserializer.rawHeaders(value));
+    return value != null ? serdes.valueFrom(value, headers) : null;
+  }
+
+  /**
+   * Returns both the raw serialized bytes and the deserialized {@link 
ValueTimestampHeaders}.
+   */
+  public RawAndDeserializedValue<V> getWithBinary(final K key, final Headers 
headers) {
+    try {
+      return maybeMeasureLatency(() -> {
+        final byte[] serializedValue = wrapped().get(keyBytes(key, headers));
+        return new RawAndDeserializedValue<>(serializedValue, 
outerValue(serializedValue));
+      }, time, getSensor);
+    } catch (final ProcessorStateException e) {
+      final String message = String.format(e.getMessage(), key);
+      throw new ProcessorStateException(message, e);
+    }
+  }
+
+  /**
+   * Only writes if the new serialized value differs (and timestamp increases) 
from the old serialized value.
+   */
+  public boolean putIfDifferentValues(final K key,

Review Comment:
   Same. Not needed



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/HeadersDeserializer.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+/*
+ * Deserializer for org.apache.kafka.common.header.Headers using the KIP‑1271 
format:
+ *
+ * headers_bytes :=
+ *   varint header_count
+ *   repeated header_count times:
+ *     varint key_length
+ *     key_length bytes of UTF-8 key
+ *     varint value_length   // -1 means null
+ *     value_length bytes of value (if value_length >= 0)
+ */
+
+public final class HeadersDeserializer implements Deserializer<Headers> {
+
+  @Override
+  public void configure(final Map<String, ?> configs, final boolean isKey) {
+  }
+
+  @Override
+  public Headers deserialize(final String topic, final byte[] data) {
+    if (data == null || data.length == 0) {
+      return new RecordHeaders();
+    }
+
+    final Index idx = new Index();
+    final int headerCount = readVarint(data, idx);
+
+    if (headerCount < 0) {
+      throw new IllegalArgumentException("Negative headerCount: " + 
headerCount);
+    }
+
+    final RecordHeaders headers = new RecordHeaders();
+
+    for (int i = 0; i < headerCount; i++) {
+      // key length
+      final int keyLen = readVarint(data, idx);
+      if (keyLen < 0) {
+        throw new IllegalArgumentException("Negative key length: " + keyLen);
+      }
+
+      final String key = new String(data, idx.pos, keyLen, 
StandardCharsets.UTF_8);
+      idx.pos += keyLen;
+
+      // value length (-1 == null)
+      final int valueLen = readVarint(data, idx);
+      final byte[] value;
+      if (valueLen < 0) {
+        value = null;
+      } else {
+        value = new byte[valueLen];
+        System.arraycopy(data, idx.pos, value, 0, valueLen);
+        idx.pos += valueLen;
+      }
+
+      headers.add(key, value);
+    }
+
+    return headers;
+  }
+
+  public static Headers deserialize(final byte[] data) {
+    HeadersDeserializer deserializer = new HeadersDeserializer();
+    return deserializer.deserialize("", data);
+  }
+
+  @Override
+  public void close() {
+    // no-op
+  }
+
+  private static final class Index {
+    int pos = 0;
+  }
+
+  /**
+   * Read a signed int encoded as zigzag+LEB128 varint.
+   */
+  private static int readVarint(final byte[] data, final Index idx) {

Review Comment:
   Is the no existing code we could reuse?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java:
##########
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.state.HeadersBytesStore;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatchInterface;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Optional;
+
+import static 
org.apache.kafka.streams.state.HeadersBytesStore.convertToHeaderFormat;
+
+/**
+ * A persistent key-(value-timestamp-headers) store based on RocksDB.
+ *
+ * This is analogous to {@link RocksDBTimestampedStore}, but the "new" column 
family stores
+ * a header-aware format. Legacy values (without headers) are converted on the 
fly using
+ * {@link HeadersBytesStore#convertToHeaderFormat(byte[], byte[])}.
+ */
+public class RocksDBTimestampedStoreWithHeaders extends RocksDBStore 
implements HeadersBytesStore {
+
+  private static final Logger log = 
LoggerFactory.getLogger(RocksDBTimestampedStoreWithHeaders.class);
+
+  // Legacy column family name - must match 
RocksDBTimestampedStore.TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME
+  private static final byte[] LEGACY_TIMESTAMPED_CF_NAME =
+      "keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8);
+
+  // New column family for header-aware timestamped values.
+  private static final byte[] TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME =
+      "keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8);
+
+  public RocksDBTimestampedStoreWithHeaders(final String name,
+                                            final String metricsScope) {
+    super(name, metricsScope);
+  }
+
+  RocksDBTimestampedStoreWithHeaders(final String name,
+                                     final String parentDir,
+                                     final RocksDBMetricsRecorder 
metricsRecorder) {
+    super(name, parentDir, metricsRecorder);
+  }
+
+  @Override
+  void openRocksDB(final DBOptions dbOptions,
+                   final ColumnFamilyOptions columnFamilyOptions) {
+    // We open three CFs:
+    //  - DEFAULT_COLUMN_FAMILY: required by RocksDB (not used)
+    //  - LEGACY_TIMESTAMPED_CF_NAME: legacy timestamped values (without 
headers)
+    //  - TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME: new header-aware format
+    //
+    // On first open with no legacy data, we just use the new CF.
+    final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
+        dbOptions,
+        new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
+        new ColumnFamilyDescriptor(LEGACY_TIMESTAMPED_CF_NAME, 
columnFamilyOptions),
+        new ColumnFamilyDescriptor(TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME, 
columnFamilyOptions)
+    );
+
+    final ColumnFamilyHandle defaultCf = columnFamilies.get(0);
+    final ColumnFamilyHandle legacyCf = columnFamilies.get(1);
+    final ColumnFamilyHandle headersCf = columnFamilies.get(2);
+
+    // Close the default CF as we don't use it
+    defaultCf.close();

Review Comment:
   Just followed up with a question on the dev list on this -- it seems, you do 
not intent to support upgrading from `KeyValueStore` to 
`TimestampedKeyValueStoreWithHeaders` directly? If we would support this 
upgrade path, we would need to use this CF.
   
   Btw: in prod code (even if we don't support this upgrade), we might want to 
check it default-CF is indeed empty -- otherwise, there could be data loss (ie, 
a user did upgrade from plain kv to ts-ks but a row was never touched, and 
never migrated from default CF to the "ts CF".



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java:
##########
@@ -315,12 +317,18 @@ public V get(final K key) {
         }
     }
 
+    //@Deprecated
     @Override
     public void put(final K key,
                     final V value) {
         Objects.requireNonNull(key, "key cannot be null");
         try {
-            maybeMeasureLatency(() -> wrapped().put(keyBytes(key), 
serdes.rawValue(value)), time, putSensor);
+            if (value instanceof ValueTimestampHeaders) {

Review Comment:
   Not sure why we need this? Is this the right abstraction?
   
   If we really cannot reuse this `put()` method, it seems cleaner to add an 
overwrite to `MeteredTimestampeKeyValueStoreWithHeader` instead of "leaking" 
the header code into this class?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedKeyValueStoreWithHeaders.java:
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.internals.SerdeGetter;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+
+
+import java.util.Objects;
+
+import static 
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency;
+
+/**
+ * A Metered {@link TimestampedKeyValueStoreWithHeaders} wrapper that is used 
for recording operation metrics, and hence
+ * its inner KeyValueStore implementation does not need to provide its own 
metrics collecting functionality.
+ *
+ * The inner {@link KeyValueStore} of this class is of type &lt;Bytes, 
byte[]&gt;,
+ * hence we use {@link Serde}s to convert from &lt;K, 
ValueTimestampHeaders&lt;V&gt;&gt; to &lt;Bytes, byte[]&gt;.
+ *
+ * @param <K> key type
+ * @param <V> value type (wrapped in {@link ValueTimestampHeaders})
+ */
+public class MeteredTimestampedKeyValueStoreWithHeaders<K, V>
+    extends MeteredKeyValueStore<K, ValueTimestampHeaders<V>>
+    implements TimestampedKeyValueStoreWithHeaders<K, V> {
+
+  MeteredTimestampedKeyValueStoreWithHeaders(final KeyValueStore<Bytes, 
byte[]> inner,
+                                             final String metricScope,
+                                             final Time time,
+                                             final Serde<K> keySerde,
+                                             final 
Serde<ValueTimestampHeaders<V>> valueSerde) {
+    super(inner, metricScope, time, keySerde, valueSerde);
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  protected Serde<ValueTimestampHeaders<V>> prepareValueSerdeForStore(final 
Serde<ValueTimestampHeaders<V>> valueSerde,
+                                                                      final 
SerdeGetter getter) {
+    if (valueSerde == null) {
+      return new ValueTimestampHeadersSerde<>((Serde<V>) getter.valueSerde());
+    } else {
+      return super.prepareValueSerdeForStore(valueSerde, getter);
+    }
+  }
+
+  @Override
+  public ValueTimestampHeaders<V> get(final K key) {

Review Comment:
   Seems this is the exact same code as in `MeteredKeyValueStore`. Why do we 
need this overwrite?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java:
##########
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.state.HeadersBytesStore;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatchInterface;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Optional;
+
+import static 
org.apache.kafka.streams.state.HeadersBytesStore.convertToHeaderFormat;
+
+/**
+ * A persistent key-(value-timestamp-headers) store based on RocksDB.
+ *
+ * This is analogous to {@link RocksDBTimestampedStore}, but the "new" column 
family stores
+ * a header-aware format. Legacy values (without headers) are converted on the 
fly using
+ * {@link HeadersBytesStore#convertToHeaderFormat(byte[], byte[])}.
+ */
+public class RocksDBTimestampedStoreWithHeaders extends RocksDBStore 
implements HeadersBytesStore {
+
+  private static final Logger log = 
LoggerFactory.getLogger(RocksDBTimestampedStoreWithHeaders.class);
+
+  // Legacy column family name - must match 
RocksDBTimestampedStore.TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME
+  private static final byte[] LEGACY_TIMESTAMPED_CF_NAME =
+      "keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8);
+
+  // New column family for header-aware timestamped values.
+  private static final byte[] TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME =
+      "keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8);
+
+  public RocksDBTimestampedStoreWithHeaders(final String name,
+                                            final String metricsScope) {
+    super(name, metricsScope);
+  }
+
+  RocksDBTimestampedStoreWithHeaders(final String name,
+                                     final String parentDir,
+                                     final RocksDBMetricsRecorder 
metricsRecorder) {
+    super(name, parentDir, metricsRecorder);
+  }
+
+  @Override
+  void openRocksDB(final DBOptions dbOptions,
+                   final ColumnFamilyOptions columnFamilyOptions) {
+    // We open three CFs:
+    //  - DEFAULT_COLUMN_FAMILY: required by RocksDB (not used)
+    //  - LEGACY_TIMESTAMPED_CF_NAME: legacy timestamped values (without 
headers)
+    //  - TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME: new header-aware format
+    //
+    // On first open with no legacy data, we just use the new CF.
+    final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
+        dbOptions,
+        new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
+        new ColumnFamilyDescriptor(LEGACY_TIMESTAMPED_CF_NAME, 
columnFamilyOptions),
+        new ColumnFamilyDescriptor(TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME, 
columnFamilyOptions)
+    );
+
+    final ColumnFamilyHandle defaultCf = columnFamilies.get(0);
+    final ColumnFamilyHandle legacyCf = columnFamilies.get(1);
+    final ColumnFamilyHandle headersCf = columnFamilies.get(2);
+
+    // Close the default CF as we don't use it
+    defaultCf.close();
+
+    final RocksIterator legacyIter = db.newIterator(legacyCf);
+    legacyIter.seekToFirst();
+    if (legacyIter.isValid()) {
+      log.info("Opening store {} in upgrade mode (legacy timestamped -> 
header-aware timestamped)", name);
+      cfAccessor = new DualColumnFamilyAccessor(legacyCf, headersCf);
+    } else {
+      log.info("Opening store {} in regular header-aware mode", name);
+      cfAccessor = new SingleColumnFamilyAccessor(headersCf);
+      legacyCf.close();
+    }
+    legacyIter.close();
+  }
+
+  /**
+   * Accessor that supports dual-column-family upgrade: legacy CF (timestamped 
without headers)
+   * and new CF (timestamped with headers).
+   */
+  private class DualColumnFamilyAccessor implements ColumnFamilyAccessor {
+
+    private final ColumnFamilyHandle oldColumnFamily;
+    private final ColumnFamilyHandle newColumnFamily;
+
+    private DualColumnFamilyAccessor(final ColumnFamilyHandle oldColumnFamily,
+                                     final ColumnFamilyHandle newColumnFamily) 
{
+      this.oldColumnFamily = oldColumnFamily;
+      this.newColumnFamily = newColumnFamily;
+    }
+
+    @Override
+    public void put(final DBAccessor accessor,
+                    final byte[] key,
+                    final byte[] valueWithHeaders) {
+      synchronized (position) {
+        if (valueWithHeaders == null) {
+          try {
+            accessor.delete(oldColumnFamily, key);
+          } catch (final RocksDBException e) {
+            throw new ProcessorStateException("Error while removing key from 
store " + name, e);
+          }
+          try {
+            accessor.delete(newColumnFamily, key);
+          } catch (final RocksDBException e) {
+            throw new ProcessorStateException("Error while removing key from 
store " + name, e);
+          }
+        } else {
+          try {
+            accessor.delete(oldColumnFamily, key);
+          } catch (final RocksDBException e) {
+            throw new ProcessorStateException("Error while removing key from 
store " + name, e);
+          }
+          try {
+            accessor.put(newColumnFamily, key, valueWithHeaders);
+            StoreQueryUtils.updatePosition(position, context);
+          } catch (final RocksDBException e) {
+            throw new ProcessorStateException("Error while putting key/value 
into store " + name, e);
+          }
+        }
+      }
+    }
+
+    @Override
+    public void prepareBatch(final List<KeyValue<Bytes, byte[]>> entries,
+                             final WriteBatchInterface batch) throws 
RocksDBException {
+      for (final KeyValue<Bytes, byte[]> entry : entries) {
+        Objects.requireNonNull(entry.key, "key cannot be null");
+        addToBatch(entry.key.get(), entry.value, batch);
+      }
+    }
+
+    @Override
+    public byte[] get(final DBAccessor accessor, final byte[] key) throws 
RocksDBException {
+      return get(accessor, key, Optional.empty());
+    }
+
+    @Override
+    public byte[] get(final DBAccessor accessor,
+                      final byte[] key,
+                      final ReadOptions readOptions) throws RocksDBException {
+      return get(accessor, key, Optional.of(readOptions));
+    }
+
+    private byte[] get(final DBAccessor accessor,
+                       final byte[] key,
+                       final Optional<ReadOptions> readOptions) throws 
RocksDBException {
+      final byte[] valueWithHeaders =
+          readOptions.isPresent()
+              ? accessor.get(newColumnFamily, readOptions.get(), key)
+              : accessor.get(newColumnFamily, key);
+
+      if (valueWithHeaders != null) {
+        return valueWithHeaders;
+      }
+
+      final byte[] legacyValue =
+          readOptions.isPresent()
+              ? accessor.get(oldColumnFamily, readOptions.get(), key)
+              : accessor.get(oldColumnFamily, key);
+
+      if (legacyValue != null) {
+        // Convert legacy timestamped value into new header-aware format.
+        final byte[] converted = convertToHeaderFormat(key, legacyValue);
+        // We can eagerly write back the converted value to new CF.
+        put(accessor, key, converted);
+        return converted;
+      }
+
+      return null;
+    }
+
+    @Override
+    public byte[] getOnly(final DBAccessor accessor,
+                          final byte[] key) throws RocksDBException {
+      final byte[] valueWithHeaders = accessor.get(newColumnFamily, key);
+      if (valueWithHeaders != null) {
+        return valueWithHeaders;
+      }
+
+      final byte[] legacyValue = accessor.get(oldColumnFamily, key);
+      if (legacyValue != null) {
+        // For "getOnly", we must NOT mutate state; just convert on the fly.
+        return convertToHeaderFormat(key, legacyValue);
+      }
+      return null;
+    }
+
+    @Override
+    public ManagedKeyValueIterator<Bytes, byte[]> range(final DBAccessor 
accessor,
+                                                        final Bytes from,
+                                                        final Bytes to,
+                                                        final boolean forward) 
{
+      return new RocksDBDualCFRangeIterator(
+          name,
+          accessor.newIterator(newColumnFamily),
+          accessor.newIterator(oldColumnFamily),
+          from,
+          to,
+          forward,
+          /* toInclusive = */ true
+      );
+    }
+
+    @Override
+    public void deleteRange(final DBAccessor accessor,
+                            final byte[] from,
+                            final byte[] to) {
+      try {
+        accessor.deleteRange(oldColumnFamily, from, to);
+      } catch (final RocksDBException e) {
+        throw new ProcessorStateException("Error while removing key from store 
" + name, e);
+      }
+      try {
+        accessor.deleteRange(newColumnFamily, from, to);
+      } catch (final RocksDBException e) {
+        throw new ProcessorStateException("Error while removing key from store 
" + name, e);
+      }
+    }
+
+    @Override
+    public ManagedKeyValueIterator<Bytes, byte[]> all(final DBAccessor 
accessor,
+                                                      final boolean forward) {
+      final RocksIterator iterWithHeaders = 
accessor.newIterator(newColumnFamily);
+      final RocksIterator iterLegacy = accessor.newIterator(oldColumnFamily);
+      if (forward) {
+        iterWithHeaders.seekToFirst();
+        iterLegacy.seekToFirst();
+      } else {
+        iterWithHeaders.seekToLast();
+        iterLegacy.seekToLast();
+      }
+      return new RocksDBDualCFIterator(name, iterWithHeaders, iterLegacy, 
forward);
+    }
+
+    @Override
+    public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final DBAccessor 
accessor,
+                                                             final Bytes 
prefix) {
+      final Bytes to = incrementWithoutOverflow(prefix);
+      return new RocksDBDualCFRangeIterator(
+          name,
+          accessor.newIterator(newColumnFamily),
+          accessor.newIterator(oldColumnFamily),
+          prefix,
+          to,
+          true,
+          /* toInclusive = */ false
+      );
+    }
+
+    @Override
+    public long approximateNumEntries(final DBAccessor accessor) throws 
RocksDBException {
+      return accessor.approximateNumEntries(oldColumnFamily)
+          + accessor.approximateNumEntries(newColumnFamily);
+    }
+
+    @Override
+    public void flush(final DBAccessor accessor) throws RocksDBException {
+      accessor.flush(oldColumnFamily, newColumnFamily);
+    }
+
+    @Override
+    public void addToBatch(final byte[] key,
+                           final byte[] value,
+                           final WriteBatchInterface batch) throws 
RocksDBException {
+      if (value == null) {
+        batch.delete(oldColumnFamily, key);
+        batch.delete(newColumnFamily, key);
+      } else {
+        batch.delete(oldColumnFamily, key);
+        batch.put(newColumnFamily, key, value);
+      }
+    }
+
+    @Override
+    public void close() {
+      oldColumnFamily.close();
+      newColumnFamily.close();
+    }
+  }
+
+  /**
+   * Iterator that merges the new (header-aware) CF and legacy CF, converting 
legacy values
+   * to the new format on the fly.
+   */
+  private static class RocksDBDualCFIterator

Review Comment:
   Same question. Can we share this code instead of duplicating it?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBKeyValueBytesStoreSupplier.java:
##########
@@ -24,11 +24,20 @@ public class RocksDBKeyValueBytesStoreSupplier implements 
KeyValueBytesStoreSupp
 
     private final String name;
     private final boolean returnTimestampedStore;
+    private final boolean hasHeaders;
 
     public RocksDBKeyValueBytesStoreSupplier(final String name,
                                              final boolean 
returnTimestampedStore) {
         this.name = name;
         this.returnTimestampedStore = returnTimestampedStore;
+        this.hasHeaders = false;

Review Comment:
   Wondering if it might be cleaner to just stay with one constructor. Sure, 
some caller code must updated, but overall seems cleaner?



##########
streams/src/main/java/org/apache/kafka/streams/state/HeadersBytesStore.java:
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state;
+
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Marker interface to indicate that a bytes store understands the 
value-with-headers format
+ * and can convert legacy "plain value" entries to the new format.
+ * <p>
+ * Per the KIP, the value format is: 
[header_length(varint)][headers_bytes][payload_bytes]
+ * where payload_bytes is the existing serialized value (e.g., 
[timestamp(8)][value] for timestamped stores).
+ */
+public interface HeadersBytesStore {
+
+  /**
+   * Converts a legacy value (without headers) to the header-embedded format.
+   * <p>
+   * For timestamped stores, the legacy format is: [timestamp(8)][value]
+   * The new format is: [header_length(2)][headers][timestamp(8)][value]
+   * <p>
+   * This method adds empty headers to the existing value format.
+   *
+   * @param key   the key bytes (may be used for context-dependent conversion; 
typically unused)
+   * @param value the legacy value bytes (for timestamped stores: 
[timestamp(8)][value])
+   * @return the value in header-embedded format with empty headers
+   */
+    static byte[] convertToHeaderFormat(final byte[] key, final byte[] value) {
+        if (value == null) {
+            return null;
+        }
+
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+             DataOutputStream out = new DataOutputStream(baos)) {
+
+            // Empty headers serialize to an empty byte array (per 
HeadersSerializer.serialize())
+            final byte[] emptyHeadersBytes = new byte[0];
+
+            // Write format: [headers_size(varint)][headers_bytes][payload]
+            ByteUtils.writeVarint(emptyHeadersBytes.length, out);  // 
headers_size = 0

Review Comment:
   Aha! Here is the code we should reused everywhere... :) ? 



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedStoreWithHeaders.java:
##########
@@ -0,0 +1,531 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.errors.InvalidStateStoreException;
+import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.state.HeadersBytesStore;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.ReadOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatchInterface;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Optional;
+
+import static 
org.apache.kafka.streams.state.HeadersBytesStore.convertToHeaderFormat;
+
+/**
+ * A persistent key-(value-timestamp-headers) store based on RocksDB.
+ *
+ * This is analogous to {@link RocksDBTimestampedStore}, but the "new" column 
family stores
+ * a header-aware format. Legacy values (without headers) are converted on the 
fly using
+ * {@link HeadersBytesStore#convertToHeaderFormat(byte[], byte[])}.
+ */
+public class RocksDBTimestampedStoreWithHeaders extends RocksDBStore 
implements HeadersBytesStore {
+
+  private static final Logger log = 
LoggerFactory.getLogger(RocksDBTimestampedStoreWithHeaders.class);
+
+  // Legacy column family name - must match 
RocksDBTimestampedStore.TIMESTAMPED_VALUES_COLUMN_FAMILY_NAME
+  private static final byte[] LEGACY_TIMESTAMPED_CF_NAME =
+      "keyValueWithTimestamp".getBytes(StandardCharsets.UTF_8);
+
+  // New column family for header-aware timestamped values.
+  private static final byte[] TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME =
+      "keyValueWithTimestampAndHeaders".getBytes(StandardCharsets.UTF_8);
+
+  public RocksDBTimestampedStoreWithHeaders(final String name,
+                                            final String metricsScope) {
+    super(name, metricsScope);
+  }
+
+  RocksDBTimestampedStoreWithHeaders(final String name,
+                                     final String parentDir,
+                                     final RocksDBMetricsRecorder 
metricsRecorder) {
+    super(name, parentDir, metricsRecorder);
+  }
+
+  @Override
+  void openRocksDB(final DBOptions dbOptions,
+                   final ColumnFamilyOptions columnFamilyOptions) {
+    // We open three CFs:
+    //  - DEFAULT_COLUMN_FAMILY: required by RocksDB (not used)
+    //  - LEGACY_TIMESTAMPED_CF_NAME: legacy timestamped values (without 
headers)
+    //  - TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME: new header-aware format
+    //
+    // On first open with no legacy data, we just use the new CF.
+    final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
+        dbOptions,
+        new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
+        new ColumnFamilyDescriptor(LEGACY_TIMESTAMPED_CF_NAME, 
columnFamilyOptions),
+        new ColumnFamilyDescriptor(TIMESTAMPED_VALUES_WITH_HEADERS_CF_NAME, 
columnFamilyOptions)
+    );
+
+    final ColumnFamilyHandle defaultCf = columnFamilies.get(0);
+    final ColumnFamilyHandle legacyCf = columnFamilies.get(1);
+    final ColumnFamilyHandle headersCf = columnFamilies.get(2);
+
+    // Close the default CF as we don't use it
+    defaultCf.close();
+
+    final RocksIterator legacyIter = db.newIterator(legacyCf);
+    legacyIter.seekToFirst();
+    if (legacyIter.isValid()) {
+      log.info("Opening store {} in upgrade mode (legacy timestamped -> 
header-aware timestamped)", name);
+      cfAccessor = new DualColumnFamilyAccessor(legacyCf, headersCf);
+    } else {
+      log.info("Opening store {} in regular header-aware mode", name);
+      cfAccessor = new SingleColumnFamilyAccessor(headersCf);
+      legacyCf.close();
+    }
+    legacyIter.close();
+  }
+
+  /**
+   * Accessor that supports dual-column-family upgrade: legacy CF (timestamped 
without headers)
+   * and new CF (timestamped with headers).
+   */
+  private class DualColumnFamilyAccessor implements ColumnFamilyAccessor {
+
+    private final ColumnFamilyHandle oldColumnFamily;
+    private final ColumnFamilyHandle newColumnFamily;
+
+    private DualColumnFamilyAccessor(final ColumnFamilyHandle oldColumnFamily,
+                                     final ColumnFamilyHandle newColumnFamily) 
{
+      this.oldColumnFamily = oldColumnFamily;
+      this.newColumnFamily = newColumnFamily;
+    }
+
+    @Override
+    public void put(final DBAccessor accessor,
+                    final byte[] key,
+                    final byte[] valueWithHeaders) {
+      synchronized (position) {
+        if (valueWithHeaders == null) {
+          try {
+            accessor.delete(oldColumnFamily, key);
+          } catch (final RocksDBException e) {
+            throw new ProcessorStateException("Error while removing key from 
store " + name, e);
+          }
+          try {
+            accessor.delete(newColumnFamily, key);
+          } catch (final RocksDBException e) {
+            throw new ProcessorStateException("Error while removing key from 
store " + name, e);
+          }
+        } else {
+          try {
+            accessor.delete(oldColumnFamily, key);
+          } catch (final RocksDBException e) {
+            throw new ProcessorStateException("Error while removing key from 
store " + name, e);
+          }
+          try {
+            accessor.put(newColumnFamily, key, valueWithHeaders);
+            StoreQueryUtils.updatePosition(position, context);
+          } catch (final RocksDBException e) {
+            throw new ProcessorStateException("Error while putting key/value 
into store " + name, e);
+          }
+        }
+      }
+    }
+
+    @Override
+    public void prepareBatch(final List<KeyValue<Bytes, byte[]>> entries,
+                             final WriteBatchInterface batch) throws 
RocksDBException {
+      for (final KeyValue<Bytes, byte[]> entry : entries) {
+        Objects.requireNonNull(entry.key, "key cannot be null");
+        addToBatch(entry.key.get(), entry.value, batch);
+      }
+    }
+
+    @Override
+    public byte[] get(final DBAccessor accessor, final byte[] key) throws 
RocksDBException {
+      return get(accessor, key, Optional.empty());
+    }
+
+    @Override
+    public byte[] get(final DBAccessor accessor,
+                      final byte[] key,
+                      final ReadOptions readOptions) throws RocksDBException {
+      return get(accessor, key, Optional.of(readOptions));
+    }
+
+    private byte[] get(final DBAccessor accessor,
+                       final byte[] key,
+                       final Optional<ReadOptions> readOptions) throws 
RocksDBException {
+      final byte[] valueWithHeaders =
+          readOptions.isPresent()
+              ? accessor.get(newColumnFamily, readOptions.get(), key)
+              : accessor.get(newColumnFamily, key);
+
+      if (valueWithHeaders != null) {
+        return valueWithHeaders;
+      }
+
+      final byte[] legacyValue =
+          readOptions.isPresent()
+              ? accessor.get(oldColumnFamily, readOptions.get(), key)
+              : accessor.get(oldColumnFamily, key);
+
+      if (legacyValue != null) {
+        // Convert legacy timestamped value into new header-aware format.
+        final byte[] converted = convertToHeaderFormat(key, legacyValue);
+        // We can eagerly write back the converted value to new CF.
+        put(accessor, key, converted);
+        return converted;
+      }
+
+      return null;
+    }
+
+    @Override
+    public byte[] getOnly(final DBAccessor accessor,
+                          final byte[] key) throws RocksDBException {
+      final byte[] valueWithHeaders = accessor.get(newColumnFamily, key);
+      if (valueWithHeaders != null) {
+        return valueWithHeaders;
+      }
+
+      final byte[] legacyValue = accessor.get(oldColumnFamily, key);
+      if (legacyValue != null) {
+        // For "getOnly", we must NOT mutate state; just convert on the fly.
+        return convertToHeaderFormat(key, legacyValue);
+      }
+      return null;
+    }
+
+    @Override
+    public ManagedKeyValueIterator<Bytes, byte[]> range(final DBAccessor 
accessor,
+                                                        final Bytes from,
+                                                        final Bytes to,
+                                                        final boolean forward) 
{
+      return new RocksDBDualCFRangeIterator(
+          name,
+          accessor.newIterator(newColumnFamily),
+          accessor.newIterator(oldColumnFamily),
+          from,
+          to,
+          forward,
+          /* toInclusive = */ true
+      );
+    }
+
+    @Override
+    public void deleteRange(final DBAccessor accessor,
+                            final byte[] from,
+                            final byte[] to) {
+      try {
+        accessor.deleteRange(oldColumnFamily, from, to);
+      } catch (final RocksDBException e) {
+        throw new ProcessorStateException("Error while removing key from store 
" + name, e);
+      }
+      try {
+        accessor.deleteRange(newColumnFamily, from, to);
+      } catch (final RocksDBException e) {
+        throw new ProcessorStateException("Error while removing key from store 
" + name, e);
+      }
+    }
+
+    @Override
+    public ManagedKeyValueIterator<Bytes, byte[]> all(final DBAccessor 
accessor,
+                                                      final boolean forward) {
+      final RocksIterator iterWithHeaders = 
accessor.newIterator(newColumnFamily);
+      final RocksIterator iterLegacy = accessor.newIterator(oldColumnFamily);
+      if (forward) {
+        iterWithHeaders.seekToFirst();
+        iterLegacy.seekToFirst();
+      } else {
+        iterWithHeaders.seekToLast();
+        iterLegacy.seekToLast();
+      }
+      return new RocksDBDualCFIterator(name, iterWithHeaders, iterLegacy, 
forward);
+    }
+
+    @Override
+    public ManagedKeyValueIterator<Bytes, byte[]> prefixScan(final DBAccessor 
accessor,
+                                                             final Bytes 
prefix) {
+      final Bytes to = incrementWithoutOverflow(prefix);
+      return new RocksDBDualCFRangeIterator(
+          name,
+          accessor.newIterator(newColumnFamily),
+          accessor.newIterator(oldColumnFamily),
+          prefix,
+          to,
+          true,
+          /* toInclusive = */ false
+      );
+    }
+
+    @Override
+    public long approximateNumEntries(final DBAccessor accessor) throws 
RocksDBException {
+      return accessor.approximateNumEntries(oldColumnFamily)
+          + accessor.approximateNumEntries(newColumnFamily);
+    }
+
+    @Override
+    public void flush(final DBAccessor accessor) throws RocksDBException {
+      accessor.flush(oldColumnFamily, newColumnFamily);
+    }
+
+    @Override
+    public void addToBatch(final byte[] key,
+                           final byte[] value,
+                           final WriteBatchInterface batch) throws 
RocksDBException {
+      if (value == null) {
+        batch.delete(oldColumnFamily, key);
+        batch.delete(newColumnFamily, key);
+      } else {
+        batch.delete(oldColumnFamily, key);
+        batch.put(newColumnFamily, key, value);
+      }
+    }
+
+    @Override
+    public void close() {
+      oldColumnFamily.close();
+      newColumnFamily.close();
+    }
+  }
+
+  /**
+   * Iterator that merges the new (header-aware) CF and legacy CF, converting 
legacy values
+   * to the new format on the fly.
+   */
+  private static class RocksDBDualCFIterator
+      extends AbstractIterator<KeyValue<Bytes, byte[]>>
+      implements ManagedKeyValueIterator<Bytes, byte[]> {
+
+    // RocksDB's JNI interface does not expose getters/setters that allow the
+    // comparator to be pluggable, and the default is lexicographic, so it's
+    // safe to just force lexicographic comparator here for now.
+    private final Comparator<byte[]> comparator = 
Bytes.BYTES_LEXICO_COMPARATOR;
+
+    private final String storeName;
+    private final RocksIterator iterWithHeaders;
+    private final RocksIterator iterLegacy;
+    private final boolean forward;
+
+    private volatile boolean open = true;
+
+    private byte[] nextWithHeaders;
+    private byte[] nextLegacy;
+    private KeyValue<Bytes, byte[]> next;
+    private Runnable closeCallback = null;
+
+    RocksDBDualCFIterator(final String storeName,
+                          final RocksIterator iterWithHeaders,
+                          final RocksIterator iterLegacy,
+                          final boolean forward) {
+      this.iterWithHeaders = iterWithHeaders;
+      this.iterLegacy = iterLegacy;
+      this.storeName = storeName;
+      this.forward = forward;
+    }
+
+    @Override
+    public synchronized boolean hasNext() {
+      if (!open) {
+        throw new InvalidStateStoreException(
+            String.format("RocksDB iterator for store %s has closed", 
storeName)
+        );
+      }
+      return super.hasNext();
+    }
+
+    @Override
+    public synchronized KeyValue<Bytes, byte[]> next() {
+      return super.next();
+    }
+
+    @Override
+    protected KeyValue<Bytes, byte[]> makeNext() {
+      if (nextLegacy == null && iterLegacy.isValid()) {
+        nextLegacy = iterLegacy.key();
+      }
+      if (nextWithHeaders == null && iterWithHeaders.isValid()) {
+        nextWithHeaders = iterWithHeaders.key();
+      }
+
+      if (nextLegacy == null && !iterLegacy.isValid()) {
+        if (nextWithHeaders == null && !iterWithHeaders.isValid()) {
+          return allDone();
+        } else {
+          next = KeyValue.pair(new Bytes(nextWithHeaders), 
iterWithHeaders.value());
+          nextWithHeaders = null;
+          if (forward) {
+            iterWithHeaders.next();
+          } else {
+            iterWithHeaders.prev();
+          }
+        }
+      } else {
+        if (nextWithHeaders == null) {
+          next = KeyValue.pair(
+              new Bytes(nextLegacy),
+              convertToHeaderFormat(iterLegacy.key(), iterLegacy.value())
+          );
+          nextLegacy = null;
+          if (forward) {
+            iterLegacy.next();
+          } else {
+            iterLegacy.prev();
+          }
+        } else {
+          if (forward) {
+            if (comparator.compare(nextLegacy, nextWithHeaders) <= 0) {
+              next = KeyValue.pair(
+                  new Bytes(nextLegacy),
+                  convertToHeaderFormat(iterLegacy.key(), iterLegacy.value())
+              );
+              nextLegacy = null;
+              iterLegacy.next();
+            } else {
+              next = KeyValue.pair(new Bytes(nextWithHeaders), 
iterWithHeaders.value());
+              nextWithHeaders = null;
+              iterWithHeaders.next();
+            }
+          } else {
+            if (comparator.compare(nextLegacy, nextWithHeaders) >= 0) {
+              next = KeyValue.pair(
+                  new Bytes(nextLegacy),
+                  convertToHeaderFormat(iterLegacy.key(), iterLegacy.value())
+              );
+              nextLegacy = null;
+              iterLegacy.prev();
+            } else {
+              next = KeyValue.pair(new Bytes(nextWithHeaders), 
iterWithHeaders.value());
+              nextWithHeaders = null;
+              iterWithHeaders.prev();
+            }
+          }
+        }
+      }
+      return next;
+    }
+
+    @Override
+    public synchronized void close() {
+      if (closeCallback == null) {
+        throw new IllegalStateException(
+            "RocksDBDualCFIterator expects close callback to be set 
immediately upon creation"
+        );
+      }
+      closeCallback.run();
+      iterLegacy.close();
+      iterWithHeaders.close();
+      open = false;
+    }
+
+    @Override
+    public Bytes peekNextKey() {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+      return next.key;
+    }
+
+    @Override
+    public void onClose(final Runnable closeCallback) {
+      this.closeCallback = closeCallback;
+    }
+  }
+
+  private static class RocksDBDualCFRangeIterator extends 
RocksDBDualCFIterator {

Review Comment:
   Same I guess



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to