This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c00d1c7cb8d KAFKA-20158 RocksDB session stores with headers support 
(4/N) (#21571)
c00d1c7cb8d is described below

commit c00d1c7cb8db8b5a86b10ff2f15704daa902103c
Author: Bill Bejeck <[email protected]>
AuthorDate: Fri Feb 27 09:32:50 2026 -0500

    KAFKA-20158 RocksDB session stores with headers support (4/N) (#21571)
    
    Adds support for `RocksDBSessionStore` to support headers for
    
    
[KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores)
    and includes testing for lazily converting the format in the open in
    upgrade mode.
---
 .../org/apache/kafka/streams/state/Stores.java     |  27 +-
 .../RocksDBMigratingSessionStoreWithHeaders.java   |  86 ++++
 .../internals/RocksDBSessionStoreWithHeaders.java  |  45 ++
 .../RocksDBTimeOrderedSessionStoreWithHeaders.java |  45 ++
 .../RocksDbSessionBytesStoreSupplier.java          |  31 +-
 ...ocksDbTimeOrderedSessionBytesStoreSupplier.java |  18 +-
 ...ssionRocksDBSegmentedBytesStoreWithHeaders.java |  36 ++
 .../state/internals/SessionSegmentWithHeaders.java |  89 ++++
 .../internals/SessionSegmentsWithHeaders.java      |  62 +++
 .../internals/AbstractSessionBytesStoreTest.java   |  46 ++
 ...ocksDBMigratingSessionStoreWithHeadersTest.java | 512 +++++++++++++++++++++
 .../RocksDBSessionStoreWithHeadersTest.java        |  33 ++
 ...ksDBTimeOrderedSessionStoreWithHeadersTest.java |  42 ++
 .../internals/SessionSegmentWithHeadersTest.java   | 140 ++++++
 .../internals/SessionSegmentsWithHeadersTest.java  | 197 ++++++++
 15 files changed, 1398 insertions(+), 11 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java 
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index e36fe042b6a..b1e8234f3c4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -440,13 +440,38 @@ public final class Stores {
      */
     public static SessionBytesStoreSupplier persistentSessionStore(final 
String name,
                                                                    final 
Duration retentionPeriod) {
+        return persistentSessionStore(name, retentionPeriod, false);
+    }
+
+    /**
+     * Create a persistent {@link SessionBytesStoreSupplier} with support for 
record headers.
+     * <p>
+     * Note that it is not safe to change the value of {@code retentionPeriod} 
between
+     * application restarts without clearing local state from application 
instances,
+     * as this may cause incorrect values to be read from the state store if 
it impacts
+     * the underlying storage format.
+     *
+     * @param name              name of the store (cannot be {@code null})
+     * @param retentionPeriod   length of time to retain data in the store 
(cannot be negative)
+     *                          (note that the retention period must be at 
least as long enough to
+     *                          contain the inactivity gap of the session and 
the entire grace period.)
+     * @return an instance of a {@link  SessionBytesStoreSupplier}
+     */
+    public static SessionBytesStoreSupplier 
persistentSessionStoreWithHeaders(final String name,
+                                                                              
final Duration retentionPeriod) {
+        return persistentSessionStore(name, retentionPeriod, true);
+    }
+
+    private static SessionBytesStoreSupplier persistentSessionStore(final 
String name,
+                                                                    final 
Duration retentionPeriod,
+                                                                    final 
boolean withHeaders) {
         Objects.requireNonNull(name, "name cannot be null");
         final String msgPrefix = 
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
         final long retentionPeriodMs = 
validateMillisecondDuration(retentionPeriod, msgPrefix);
         if (retentionPeriodMs < 0) {
             throw new IllegalArgumentException("retentionPeriod cannot be 
negative");
         }
-        return new RocksDbSessionBytesStoreSupplier(name, retentionPeriodMs);
+        return new RocksDbSessionBytesStoreSupplier(name, retentionPeriodMs, 
withHeaders);
     }
 
     /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeaders.java
new file mode 100644
index 00000000000..d0534915a06
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeaders.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.state.HeadersBytesStore;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/**
+ * A persistent key-value store with headers support based on RocksDB.
+ * <p>
+ * This store provides a migration path from plain {@link RocksDBStore} 
(DEFAULT column family)
+ * to a headers-aware column family ({@code sessionKeyValueWithHeaders}). It 
uses
+ * {@link DualColumnFamilyAccessor} for lazy migration when legacy data exists 
in the DEFAULT CF.
+ */
+public class RocksDBMigratingSessionStoreWithHeaders extends RocksDBStore 
implements HeadersBytesStore {
+    private static final Logger log = 
LoggerFactory.getLogger(RocksDBMigratingSessionStoreWithHeaders.class);
+
+    static final byte[] SESSION_STORE_HEADERS_VALUES_COLUMN_FAMILY_NAME = 
"sessionKeyValueWithHeaders".getBytes(StandardCharsets.UTF_8);
+
+    public RocksDBMigratingSessionStoreWithHeaders(final String name,
+                                                   final String metricsScope) {
+        super(name, metricsScope);
+    }
+
+    RocksDBMigratingSessionStoreWithHeaders(final String name,
+                                            final String parentDir,
+                                            final RocksDBMetricsRecorder 
metricsRecorder) {
+        super(name, parentDir, metricsRecorder);
+    }
+
+    @Override
+    void openRocksDB(final DBOptions dbOptions,
+                     final ColumnFamilyOptions columnFamilyOptions) {
+        final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
+            dbOptions,
+            new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
+            new 
ColumnFamilyDescriptor(SESSION_STORE_HEADERS_VALUES_COLUMN_FAMILY_NAME, 
columnFamilyOptions)
+        );
+        final ColumnFamilyHandle noHeadersColumnFamily = columnFamilies.get(0);
+        final ColumnFamilyHandle withHeadersColumnFamily = 
columnFamilies.get(1);
+
+        final RocksIterator noHeadersIter = 
db.newIterator(noHeadersColumnFamily);
+        noHeadersIter.seekToFirst();
+        if (noHeadersIter.isValid()) {
+            log.info("Opening store {} in upgrade mode", name);
+            cfAccessor = new DualColumnFamilyAccessor(
+                noHeadersColumnFamily,
+                withHeadersColumnFamily,
+                HeadersBytesStore::convertToHeaderFormat,
+                this
+            );
+        } else {
+            log.info("Opening store {} in regular mode", name);
+            cfAccessor = new 
SingleColumnFamilyAccessor(withHeadersColumnFamily);
+            noHeadersColumnFamily.close();
+        }
+        noHeadersIter.close();
+    }
+
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeaders.java
new file mode 100644
index 00000000000..75d0ed17eff
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeaders.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.HeadersBytesStore;
+
+/**
+ * RocksDB-backed session store with support for record headers.
+ * <p>
+ * This store extends {@link RocksDBSessionStore} and overrides
+ * {@code query()} to disable IQv2 for header-aware stores.
+ * <p>
+ * The storage format for values is: 
[headersSize(varint)][headersBytes][aggregationBytes]
+ *
+ * @see RocksDBSessionStore
+ */
+class RocksDBSessionStoreWithHeaders extends RocksDBSessionStore implements 
HeadersBytesStore {
+
+    RocksDBSessionStoreWithHeaders(final SegmentedBytesStore bytesStore) {
+        super(bytesStore);
+    }
+
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query, final PositionBound 
positionBound, final QueryConfig config) {
+        throw new UnsupportedOperationException("Querying stores with headers 
is not supported");
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
new file mode 100644
index 00000000000..2d5ab757804
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.HeadersBytesStore;
+
+/**
+ * RocksDB-backed time-ordered session store with support for record headers.
+ * <p>
+ * This store extends {@link RocksDBTimeOrderedSessionStore} and overrides
+ * {@code query()} to disable IQv2 for header-aware stores.
+ * <p>
+ * The storage format for values is: 
[headersSize(varint)][headersBytes][aggregationBytes]
+ *
+ * @see RocksDBTimeOrderedSessionStore
+ */
+class RocksDBTimeOrderedSessionStoreWithHeaders extends 
RocksDBTimeOrderedSessionStore implements HeadersBytesStore {
+
+    RocksDBTimeOrderedSessionStoreWithHeaders(final 
RocksDBTimeOrderedSessionSegmentedBytesStore store) {
+        super(store);
+    }
+
+    @Override
+    public <R> QueryResult<R> query(final Query<R> query, final PositionBound 
positionBound, final QueryConfig config) {
+        throw new UnsupportedOperationException("Querying stores with headers 
is not supported");
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
index 684ebf41431..6bd542ecf93 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
@@ -23,11 +23,19 @@ import org.apache.kafka.streams.state.SessionStore;
 public class RocksDbSessionBytesStoreSupplier implements 
SessionBytesStoreSupplier {
     private final String name;
     private final long retentionPeriod;
+    private final boolean withHeaders;
 
     public RocksDbSessionBytesStoreSupplier(final String name,
                                             final long retentionPeriod) {
+        this(name, retentionPeriod, false);
+    }
+
+    public RocksDbSessionBytesStoreSupplier(final String name,
+                                            final long retentionPeriod,
+                                            final boolean withHeaders) {
         this.name = name;
         this.retentionPeriod = retentionPeriod;
+        this.withHeaders = withHeaders;
     }
 
     @Override
@@ -37,13 +45,22 @@ public class RocksDbSessionBytesStoreSupplier implements 
SessionBytesStoreSuppli
 
     @Override
     public SessionStore<Bytes, byte[]> get() {
-        final RocksDBSegmentedBytesStore segmented = new 
RocksDBSegmentedBytesStore(
-            name,
-            metricsScope(),
-            retentionPeriod,
-            segmentIntervalMs(),
-            new SessionKeySchema());
-        return new RocksDBSessionStore(segmented);
+        if (withHeaders) {
+            return new RocksDBSessionStoreWithHeaders(
+                new SessionRocksDBSegmentedBytesStoreWithHeaders(
+                    name,
+                    metricsScope(),
+                    retentionPeriod,
+                    segmentIntervalMs(),
+                    new SessionKeySchema()));
+        }
+        return new RocksDBSessionStore(
+            new RocksDBSegmentedBytesStore(
+                name,
+                metricsScope(),
+                retentionPeriod,
+                segmentIntervalMs(),
+                new SessionKeySchema()));
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbTimeOrderedSessionBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbTimeOrderedSessionBytesStoreSupplier.java
index 60cd710e6a3..25473fb97d7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbTimeOrderedSessionBytesStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbTimeOrderedSessionBytesStoreSupplier.java
@@ -24,13 +24,22 @@ public class RocksDbTimeOrderedSessionBytesStoreSupplier 
implements SessionBytes
     private final String name;
     private final long retentionPeriod;
     private final boolean withIndex;
+    private final boolean withHeaders;
 
     public RocksDbTimeOrderedSessionBytesStoreSupplier(final String name,
                                                        final long 
retentionPeriod,
                                                        final boolean 
withIndex) {
+        this(name, retentionPeriod, withIndex, false);
+    }
+
+    public RocksDbTimeOrderedSessionBytesStoreSupplier(final String name,
+                                                       final long 
retentionPeriod,
+                                                       final boolean withIndex,
+                                                       final boolean 
withHeaders) {
         this.name = name;
         this.retentionPeriod = retentionPeriod;
         this.withIndex = withIndex;
+        this.withHeaders = withHeaders;
     }
 
     @Override
@@ -40,15 +49,18 @@ public class RocksDbTimeOrderedSessionBytesStoreSupplier 
implements SessionBytes
 
     @Override
     public SessionStore<Bytes, byte[]> get() {
-        return new RocksDBTimeOrderedSessionStore(
+        final RocksDBTimeOrderedSessionSegmentedBytesStore bytesStore =
             new RocksDBTimeOrderedSessionSegmentedBytesStore(
                 name,
                 metricsScope(),
                 retentionPeriod,
                 segmentIntervalMs(),
                 withIndex
-            )
-        );
+            );
+        if (withHeaders) {
+            return new RocksDBTimeOrderedSessionStoreWithHeaders(bytesStore);
+        }
+        return new RocksDBTimeOrderedSessionStore(bytesStore);
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionRocksDBSegmentedBytesStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionRocksDBSegmentedBytesStoreWithHeaders.java
new file mode 100644
index 00000000000..e134e8f4d7d
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionRocksDBSegmentedBytesStoreWithHeaders.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+/**
+ * A RocksDB-backed segmented bytes store with headers support.
+ * <p>
+ * This store uses {@link SessionSegmentsWithHeaders} to manage segments,
+ * where each segment is a {@link SessionSegmentWithHeaders} that extends
+ * {@link RocksDBMigratingSessionStoreWithHeaders}. This provides automatic 
dual-CF
+ * migration support from plain key-value format to headers format.
+ */
+public class SessionRocksDBSegmentedBytesStoreWithHeaders extends 
AbstractRocksDBSegmentedBytesStore<SessionSegmentWithHeaders> {
+
+    SessionRocksDBSegmentedBytesStoreWithHeaders(final String name,
+                                                 final String metricsScope,
+                                                 final long retention,
+                                                 final long segmentInterval,
+                                                 final KeySchema keySchema) {
+        super(name, retention, keySchema, new SessionSegmentsWithHeaders(name, 
metricsScope, retention, segmentInterval));
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionSegmentWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionSegmentWithHeaders.java
new file mode 100644
index 00000000000..f93dfdbf2ab
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionSegmentWithHeaders.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A segment that stores key-value pairs with headers.
+ * <p>
+ * This segment extends {@link RocksDBMigratingSessionStoreWithHeaders} to 
provide
+ * header-aware storage with dual-column-family migration support from
+ * plain key-value format to headers format.
+ */
+class SessionSegmentWithHeaders extends 
RocksDBMigratingSessionStoreWithHeaders implements Segment {
+
+    public final long id;
+
+    SessionSegmentWithHeaders(final String segmentName,
+                              final String windowName,
+                              final long id,
+                              final Position position,
+                              final RocksDBMetricsRecorder metricsRecorder) {
+        super(segmentName, windowName, metricsRecorder);
+        this.id = id;
+        this.position = position;
+    }
+
+    @Override
+    public void destroy() throws IOException {
+        Utils.delete(dbDir);
+    }
+
+    @Override
+    public long id() {
+        return id;
+    }
+
+    @Override
+    public void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void openDB(final Map<String, Object> configs, final File stateDir) 
{
+        super.openDB(configs, stateDir);
+        // skip the registering step
+    }
+
+    @Override
+    public String toString() {
+        return "SessionSegmentWithHeaders(id=" + id + ", name=" + name() + ")";
+    }
+
+    @Override
+    public boolean equals(final Object obj) {
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+        final SessionSegmentWithHeaders segment = (SessionSegmentWithHeaders) 
obj;
+        return id == segment.id;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id);
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeaders.java
new file mode 100644
index 00000000000..9d940a26453
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeaders.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+/**
+ * Manages the {@link SessionSegmentWithHeaders}s that are used by the {@link 
SessionRocksDBSegmentedBytesStoreWithHeaders}.
+ */
+class SessionSegmentsWithHeaders extends 
AbstractSegments<SessionSegmentWithHeaders> {
+
+    private final RocksDBMetricsRecorder metricsRecorder;
+
+    SessionSegmentsWithHeaders(final String name,
+                               final String metricsScope,
+                               final long retentionPeriod,
+                               final long segmentInterval) {
+        super(name, retentionPeriod, segmentInterval);
+        metricsRecorder = new RocksDBMetricsRecorder(metricsScope, name);
+    }
+
+    @Override
+    protected SessionSegmentWithHeaders createSegment(final long segmentId, 
final String segmentName) {
+        return new SessionSegmentWithHeaders(segmentName, name, segmentId, 
position, metricsRecorder);
+    }
+
+    @Override
+    protected void openSegmentDB(final SessionSegmentWithHeaders segment, 
final StateStoreContext context) {
+        segment.openDB(context.appConfigs(), context.stateDir());
+    }
+
+    @Override
+    public SessionSegmentWithHeaders getOrCreateSegmentIfLive(final long 
segmentId,
+                                                              final 
StateStoreContext context,
+                                                              final long 
streamTime) {
+        final SessionSegmentWithHeaders segment = 
super.getOrCreateSegmentIfLive(segmentId, context, streamTime);
+        cleanupExpiredSegments(streamTime);
+        return segment;
+    }
+
+    @Override
+    public void openExisting(final StateStoreContext context, final long 
streamTime) {
+        metricsRecorder.init(ProcessorContextUtils.metricsImpl(context), 
context.taskId());
+        super.openExisting(context, streamTime);
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
index 18b7dd89003..d70bcc964db 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
@@ -85,8 +85,11 @@ public abstract class AbstractSessionBytesStoreTest {
     
     enum StoreType {
         RocksDBSessionStore,
+        RocksDBSessionStoreWithHeaders,
         RocksDBTimeOrderedSessionStoreWithIndex,
         RocksDBTimeOrderedSessionStoreWithoutIndex,
+        RocksDBTimeOrderedSessionStoreWithHeadersWithIndex,
+        RocksDBTimeOrderedSessionStoreWithHeadersWithoutIndex,
         InMemoryStore
     }
 
@@ -130,6 +133,49 @@ public abstract class AbstractSessionBytesStoreTest {
                         valueSerde
                 ).build();
             }
+            case RocksDBSessionStoreWithHeaders: {
+                return Stores.sessionStoreBuilder(
+                        new 
RocksDbSessionBytesStoreSupplier(ROCK_DB_STORE_NAME, retentionPeriod) {
+                            @Override
+                            public SessionStore<Bytes, byte[]> get() {
+                                return new RocksDBSessionStoreWithHeaders(
+                                    new RocksDBSegmentedBytesStore(
+                                        name(), metricsScope(), 
retentionPeriod(), segmentIntervalMs(),
+                                        new SessionKeySchema()));
+                            }
+                        },
+                        keySerde,
+                        valueSerde
+                ).build();
+            }
+            case RocksDBTimeOrderedSessionStoreWithHeadersWithIndex: {
+                return Stores.sessionStoreBuilder(
+                        new 
RocksDbTimeOrderedSessionBytesStoreSupplier(ROCK_DB_STORE_NAME, 
retentionPeriod, true) {
+                            @Override
+                            public SessionStore<Bytes, byte[]> get() {
+                                return new 
RocksDBTimeOrderedSessionStoreWithHeaders(
+                                    new 
RocksDBTimeOrderedSessionSegmentedBytesStore(
+                                        name(), metricsScope(), 
retentionPeriod(), segmentIntervalMs(), true));
+                            }
+                        },
+                        keySerde,
+                        valueSerde
+                ).build();
+            }
+            case RocksDBTimeOrderedSessionStoreWithHeadersWithoutIndex: {
+                return Stores.sessionStoreBuilder(
+                        new 
RocksDbTimeOrderedSessionBytesStoreSupplier(ROCK_DB_STORE_NAME, 
retentionPeriod, false) {
+                            @Override
+                            public SessionStore<Bytes, byte[]> get() {
+                                return new 
RocksDBTimeOrderedSessionStoreWithHeaders(
+                                    new 
RocksDBTimeOrderedSessionSegmentedBytesStore(
+                                        name(), metricsScope(), 
retentionPeriod(), segmentIntervalMs(), false));
+                            }
+                        },
+                        keySerde,
+                        valueSerde
+                ).build();
+            }
             case InMemoryStore: {
                 return Stores.sessionStoreBuilder(
                         Stores.inMemorySessionStore(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeadersTest.java
new file mode 100644
index 00000000000..735ffe4167e
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeadersTest.java
@@ -0,0 +1,512 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import org.junit.jupiter.api.Test;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RocksDBMigratingSessionStoreWithHeadersTest extends 
RocksDBStoreTest {
+
+    private final Serializer<String> stringSerializer = new StringSerializer();
+    private final AggregationWithHeadersSerializer<String> aggSerializer =
+        new AggregationWithHeadersSerializer<>(new StringSerializer());
+    private final AggregationWithHeadersDeserializer<String> aggDeserializer =
+        new AggregationWithHeadersDeserializer<>(new StringDeserializer());
+    private final byte[] sessionStoreHeaderColumnFamilyName = 
RocksDBMigratingSessionStoreWithHeaders.SESSION_STORE_HEADERS_VALUES_COLUMN_FAMILY_NAME;
+
+    RocksDBStore getRocksDBStore() {
+        return new RocksDBMigratingSessionStoreWithHeaders(DB_NAME, 
METRICS_SCOPE);
+    }
+
+    @Test
+    public void shouldOpenNewStoreInRegularMode() {
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(RocksDBMigratingSessionStoreWithHeaders.class))
 {
+            rocksDBStore.init(context, rocksDBStore);
+
+            assertTrue(appender.getMessages().contains("Opening store " + 
DB_NAME + " in regular mode"));
+        }
+
+        try (final KeyValueIterator<Bytes, byte[]> iterator = 
rocksDBStore.all()) {
+            assertFalse(iterator.hasNext());
+        }
+    }
+
+    @Test
+    public void shouldOpenExistingStoreInRegularMode() throws Exception {
+        final String key = "key";
+        final String value = "withHeaders";
+        // prepare store
+        rocksDBStore.init(context, rocksDBStore);
+        rocksDBStore.put(new Bytes(key.getBytes()), value.getBytes());
+        rocksDBStore.close();
+
+        // re-open store
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(RocksDBMigratingSessionStoreWithHeaders.class))
 {
+            rocksDBStore.init(context, rocksDBStore);
+
+            assertTrue(appender.getMessages().contains("Opening store " + 
DB_NAME + " in regular mode"));
+        } finally {
+            rocksDBStore.close();
+        }
+
+        // verify store
+        final DBOptions dbOptions = new DBOptions();
+        final ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions();
+
+        final List<ColumnFamilyDescriptor> columnFamilyDescriptors = asList(
+                new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
+                new ColumnFamilyDescriptor(sessionStoreHeaderColumnFamilyName, 
columnFamilyOptions));
+        final List<ColumnFamilyHandle> columnFamilies = new 
ArrayList<>(columnFamilyDescriptors.size());
+
+        RocksDB db = null;
+        ColumnFamilyHandle defaultColumnFamily = null, headersColumnFamily = 
null;
+        try {
+            db = RocksDB.open(
+                    dbOptions,
+                    new File(new File(context.stateDir(), "rocksdb"), 
DB_NAME).getAbsolutePath(),
+                    columnFamilyDescriptors,
+                    columnFamilies);
+
+            defaultColumnFamily = columnFamilies.get(0);
+            headersColumnFamily = columnFamilies.get(1);
+
+            assertNull(db.get(defaultColumnFamily, "key".getBytes()));
+            assertEquals(0L, db.getLongProperty(defaultColumnFamily, 
"rocksdb.estimate-num-keys"));
+            assertEquals(value.getBytes().length, db.get(headersColumnFamily, 
"key".getBytes()).length);
+            assertEquals(1L, db.getLongProperty(headersColumnFamily, 
"rocksdb.estimate-num-keys"));
+        } finally {
+            // Order of closing must follow: ColumnFamilyHandle > RocksDB > 
DBOptions > ColumnFamilyOptions
+            if (defaultColumnFamily != null) {
+                defaultColumnFamily.close();
+            }
+            if (headersColumnFamily != null) {
+                headersColumnFamily.close();
+            }
+            if (db != null) {
+                db.close();
+            }
+            dbOptions.close();
+            columnFamilyOptions.close();
+        }
+    }
+
+    @Test
+    public void shouldMigrateFromDefaultToHeadersAwareColumnFamily() throws 
Exception {
+        prepareDefaultStore();
+
+        // Open with RocksDBMigratingSessionStoreWithHeaders - should detect 
legacy data in DEFAULT CF and enter upgrade mode
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(RocksDBMigratingSessionStoreWithHeaders.class))
 {
+            rocksDBStore.init(context, rocksDBStore);
+
+            assertTrue(appender.getMessages().contains("Opening store " + 
DB_NAME + " in upgrade mode"));
+        }
+
+        // get() - tests lazy migration on read: legacy value converted via 
convertToHeaderFormat
+        assertNull(rocksDBStore.get(new Bytes("unknown".getBytes())), 
"Expected null for unknown key");
+
+        final byte[] key1Result = rocksDBStore.get(new 
Bytes("key1".getBytes()));
+        assertMigratedValue(key1Result, "1");
+
+        // put() - tests migration on write using properly serialized 
AggregationWithHeaders
+        final byte[] key2Value = serializeAggWithHeaders("22", testHeaders());
+        rocksDBStore.put(new Bytes("key2".getBytes()), key2Value);
+
+        rocksDBStore.put(new Bytes("key3".getBytes()), null);
+
+        final byte[] key8Value = serializeAggWithHeaders("88888888", new 
RecordHeaders());
+        rocksDBStore.put(new Bytes("key8new".getBytes()), key8Value);
+
+        // putIfAbsent() - tests migration on conditional write
+        final byte[] key11Value = serializeAggWithHeaders("11111111111", 
testHeaders());
+        assertNull(rocksDBStore.putIfAbsent(new Bytes("key11new".getBytes()), 
key11Value),
+            "Expected null return value for putIfAbsent on non-existing 
key11new");
+
+        final byte[] key5Result = rocksDBStore.putIfAbsent(new 
Bytes("key5".getBytes()), null);
+        assertMigratedValue(key5Result, "55555");
+
+        assertNull(rocksDBStore.putIfAbsent(new Bytes("key12new".getBytes()), 
null));
+
+        // delete() - tests migration on delete
+        final byte[] key6Result = rocksDBStore.delete(new 
Bytes("key6".getBytes()));
+        assertMigratedValue(key6Result, "666666");
+
+        // iterators should not trigger migration (read-only)
+        iteratorsShouldNotMigrateData();
+
+        rocksDBStore.close();
+
+        // Verify the final state of both column families
+        verifyOldAndNewColumnFamily();
+    }
+
+    private void iteratorsShouldNotMigrateData() {
+        // iterating should not migrate any data, but return all keys over 
both CFs
+        // Values from DEFAULT CF are converted to header-aware format on the 
fly via convertToHeaderFormat
+        try (final KeyValueIterator<Bytes, byte[]> itAll = rocksDBStore.all()) 
{
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key1".getBytes(), keyValue.key.get()); // 
migrated by get()
+                assertMigratedValue(keyValue.value, "1");
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key11new".getBytes(), keyValue.key.get()); 
// inserted by putIfAbsent()
+                assertValueWithHeaders(keyValue.value, "11111111111", 
testHeaders());
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key2".getBytes(), keyValue.key.get()); // 
replaced by put()
+                assertValueWithHeaders(keyValue.value, "22", testHeaders());
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key4".getBytes(), keyValue.key.get()); // 
not migrated, on-the-fly conversion
+                assertMigratedValue(keyValue.value, "4444");
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key5".getBytes(), keyValue.key.get()); // 
migrated by putIfAbsent with null value
+                assertMigratedValue(keyValue.value, "55555");
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key7".getBytes(), keyValue.key.get()); // 
not migrated, on-the-fly conversion
+                assertMigratedValue(keyValue.value, "7777777");
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key8new".getBytes(), keyValue.key.get()); 
// inserted by put()
+                assertMigratedValue(keyValue.value, "88888888");
+            }
+            assertFalse(itAll.hasNext());
+        }
+
+        try (final KeyValueIterator<Bytes, byte[]> it =
+                          rocksDBStore.range(new Bytes("key2".getBytes()), new 
Bytes("key5".getBytes()))) {
+            {
+                final KeyValue<Bytes, byte[]> keyValue = it.next();
+                assertArrayEquals("key2".getBytes(), keyValue.key.get());
+                assertValueWithHeaders(keyValue.value, "22", testHeaders());
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = it.next();
+                assertArrayEquals("key4".getBytes(), keyValue.key.get());
+                assertMigratedValue(keyValue.value, "4444");
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = it.next();
+                assertArrayEquals("key5".getBytes(), keyValue.key.get());
+                assertMigratedValue(keyValue.value, "55555");
+            }
+            assertFalse(it.hasNext());
+        }
+
+        try (final KeyValueIterator<Bytes, byte[]> itAll = 
rocksDBStore.reverseAll()) {
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key8new".getBytes(), keyValue.key.get());
+                assertMigratedValue(keyValue.value, "88888888");
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key7".getBytes(), keyValue.key.get());
+                assertMigratedValue(keyValue.value, "7777777");
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key5".getBytes(), keyValue.key.get());
+                assertMigratedValue(keyValue.value, "55555");
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key4".getBytes(), keyValue.key.get());
+                assertMigratedValue(keyValue.value, "4444");
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key2".getBytes(), keyValue.key.get());
+                assertValueWithHeaders(keyValue.value, "22", testHeaders());
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key11new".getBytes(), keyValue.key.get());
+                assertValueWithHeaders(keyValue.value, "11111111111", 
testHeaders());
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+                assertArrayEquals("key1".getBytes(), keyValue.key.get());
+                assertMigratedValue(keyValue.value, "1");
+            }
+            assertFalse(itAll.hasNext());
+        }
+
+        try (final KeyValueIterator<Bytes, byte[]> it =
+                          rocksDBStore.reverseRange(new 
Bytes("key2".getBytes()), new Bytes("key5".getBytes()))) {
+            {
+                final KeyValue<Bytes, byte[]> keyValue = it.next();
+                assertArrayEquals("key5".getBytes(), keyValue.key.get());
+                assertMigratedValue(keyValue.value, "55555");
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = it.next();
+                assertArrayEquals("key4".getBytes(), keyValue.key.get());
+                assertMigratedValue(keyValue.value, "4444");
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = it.next();
+                assertArrayEquals("key2".getBytes(), keyValue.key.get());
+                assertValueWithHeaders(keyValue.value, "22", testHeaders());
+            }
+            assertFalse(it.hasNext());
+        }
+
+        try (final KeyValueIterator<Bytes, byte[]> it = 
rocksDBStore.prefixScan("key1", stringSerializer)) {
+            {
+                final KeyValue<Bytes, byte[]> keyValue = it.next();
+                assertArrayEquals("key1".getBytes(), keyValue.key.get());
+                assertMigratedValue(keyValue.value, "1");
+            }
+            {
+                final KeyValue<Bytes, byte[]> keyValue = it.next();
+                assertArrayEquals("key11new".getBytes(), keyValue.key.get());
+                assertValueWithHeaders(keyValue.value, "11111111111", 
testHeaders());
+            }
+            assertFalse(it.hasNext());
+        }
+    }
+
+    private void verifyOldAndNewColumnFamily() throws Exception {
+        verifyColumnFamilyContents();
+        verifyStillInUpgradeMode();
+        clearDefaultColumnFamily();
+        verifyInRegularMode();
+    }
+
+    private void verifyColumnFamilyContents() throws Exception {
+        final DBOptions dbOptions = new DBOptions();
+        final ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions();
+
+        final List<ColumnFamilyDescriptor> columnFamilyDescriptors = asList(
+                new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
+                new ColumnFamilyDescriptor(sessionStoreHeaderColumnFamilyName, 
columnFamilyOptions));
+
+        final List<ColumnFamilyHandle> columnFamilies = new 
ArrayList<>(columnFamilyDescriptors.size());
+        RocksDB db = null;
+        ColumnFamilyHandle defaultColumnFamily = null;
+        ColumnFamilyHandle headersColumnFamily = null;
+        try {
+            db = RocksDB.open(
+                    dbOptions,
+                    new File(new File(context.stateDir(), "rocksdb"), 
DB_NAME).getAbsolutePath(),
+                    columnFamilyDescriptors,
+                    columnFamilies);
+
+            defaultColumnFamily = columnFamilies.get(0);
+            headersColumnFamily = columnFamilies.get(1);
+
+            verifyDefaultColumnFamily(db, defaultColumnFamily);
+            verifyHeadersColumnFamily(db, headersColumnFamily);
+        } finally {
+            closeColumnFamilies(db, defaultColumnFamily, headersColumnFamily);
+            dbOptions.close();
+            columnFamilyOptions.close();
+        }
+    }
+
+    private void verifyDefaultColumnFamily(final RocksDB db, final 
ColumnFamilyHandle defaultColumnFamily) throws Exception {
+        // DEFAULT CF should have un-migrated keys with plain aggregation 
bytes; migrated keys should be deleted
+        assertNull(db.get(defaultColumnFamily, "unknown".getBytes()));
+        assertNull(db.get(defaultColumnFamily, "key1".getBytes())); // 
migrated by get()
+        assertNull(db.get(defaultColumnFamily, "key2".getBytes())); // 
migrated by put()
+        assertNull(db.get(defaultColumnFamily, "key3".getBytes())); // deleted
+        assertArrayEquals("4444".getBytes(), db.get(defaultColumnFamily, 
"key4".getBytes())); // not migrated, plain aggregation
+        assertNull(db.get(defaultColumnFamily, "key5".getBytes())); // 
migrated by putIfAbsent()
+        assertNull(db.get(defaultColumnFamily, "key6".getBytes())); // 
migrated by delete()
+        assertArrayEquals("7777777".getBytes(), db.get(defaultColumnFamily, 
"key7".getBytes())); // not migrated, plain aggregation
+        assertNull(db.get(defaultColumnFamily, "key8new".getBytes()));
+        assertNull(db.get(defaultColumnFamily, "key11new".getBytes()));
+    }
+
+    private void verifyHeadersColumnFamily(final RocksDB db, final 
ColumnFamilyHandle headersColumnFamily) throws Exception {
+        // Headers CF should have all migrated/new keys in headers-aware format
+        assertNull(db.get(headersColumnFamily, "unknown".getBytes()));
+        assertMigratedValue(db.get(headersColumnFamily, "key1".getBytes()), 
"1"); // migrated by get()
+        assertValueWithHeaders(db.get(headersColumnFamily, "key2".getBytes()), 
"22", testHeaders()); // put with headers
+        assertNull(db.get(headersColumnFamily, "key3".getBytes())); // put 
with null value => deleted
+        assertNull(db.get(headersColumnFamily, "key4".getBytes())); // not 
migrated, still in DEFAULT CF
+        assertMigratedValue(db.get(headersColumnFamily, "key5".getBytes()), 
"55555"); // migrated by putIfAbsent
+        assertNull(db.get(headersColumnFamily, "key6".getBytes())); // 
migrated by delete() => deleted
+        assertNull(db.get(headersColumnFamily, "key7".getBytes())); // not 
migrated, still in DEFAULT CF
+        assertMigratedValue(db.get(headersColumnFamily, "key8new".getBytes()), 
"88888888"); // put with empty headers
+        assertValueWithHeaders(db.get(headersColumnFamily, 
"key11new".getBytes()), "11111111111", testHeaders()); // putIfAbsent with 
headers
+        assertNull(db.get(headersColumnFamily, "key12new".getBytes())); // 
putIfAbsent with null value on non-existing key
+    }
+
+    private void closeColumnFamilies(
+            final RocksDB db,
+            final ColumnFamilyHandle defaultColumnFamily,
+            final ColumnFamilyHandle headersColumnFamily) {
+        // Order of closing must follow: ColumnFamilyHandle > RocksDB
+        if (defaultColumnFamily != null) {
+            defaultColumnFamily.close();
+        }
+        if (headersColumnFamily != null) {
+            headersColumnFamily.close();
+        }
+        if (db != null) {
+            db.close();
+        }
+    }
+
+    private void verifyStillInUpgradeMode() {
+        // check that still in upgrade mode
+        try (LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(RocksDBMigratingSessionStoreWithHeaders.class))
 {
+            rocksDBStore.init(context, rocksDBStore);
+
+            assertTrue(appender.getMessages().contains("Opening store " + 
DB_NAME + " in upgrade mode"));
+        } finally {
+            rocksDBStore.close();
+        }
+    }
+
+    private void clearDefaultColumnFamily() throws Exception {
+        // clear DEFAULT CF by deleting remaining keys
+        final DBOptions dbOptions = new DBOptions();
+        final ColumnFamilyOptions columnFamilyOptions = new 
ColumnFamilyOptions();
+
+        final List<ColumnFamilyDescriptor> columnFamilyDescriptors = asList(
+                new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, 
columnFamilyOptions),
+                new ColumnFamilyDescriptor(sessionStoreHeaderColumnFamilyName, 
columnFamilyOptions));
+
+        final List<ColumnFamilyHandle> columnFamilies = new 
ArrayList<>(columnFamilyDescriptors.size());
+        RocksDB db = null;
+        ColumnFamilyHandle defaultCF = null;
+        ColumnFamilyHandle headersCF = null;
+        try {
+            db = RocksDB.open(
+                    dbOptions,
+                    new File(new File(context.stateDir(), "rocksdb"), 
DB_NAME).getAbsolutePath(),
+                    columnFamilyDescriptors,
+                    columnFamilies);
+
+            defaultCF = columnFamilies.get(0);
+            headersCF = columnFamilies.get(1);
+            db.delete(defaultCF, "key4".getBytes());
+            db.delete(defaultCF, "key7".getBytes());
+        } finally {
+            // Order of closing must follow: ColumnFamilyHandle > RocksDB > 
DBOptions > ColumnFamilyOptions
+            if (defaultCF != null) {
+                defaultCF.close();
+            }
+            if (headersCF != null) {
+                headersCF.close();
+            }
+            if (db != null) {
+                db.close();
+            }
+            dbOptions.close();
+            columnFamilyOptions.close();
+        }
+    }
+
+    private void verifyInRegularMode() {
+        // check that now in regular mode (all legacy data migrated)
+        try (LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(RocksDBMigratingSessionStoreWithHeaders.class))
 {
+            rocksDBStore.init(context, rocksDBStore);
+
+            assertTrue(appender.getMessages().contains("Opening store " + 
DB_NAME + " in regular mode"));
+        }
+    }
+
+    private void prepareDefaultStore() {
+        // Create a plain RocksDBStore with data in default column family
+        final RocksDBStore kvStore = new RocksDBStore(DB_NAME, METRICS_SCOPE);
+        try {
+            kvStore.init(context, kvStore);
+
+            // Write plain aggregation bytes to default column family 
(simulating pre-headers store data)
+            kvStore.put(new Bytes("key1".getBytes()), "1".getBytes());
+            kvStore.put(new Bytes("key2".getBytes()), "22".getBytes());
+            kvStore.put(new Bytes("key3".getBytes()), "333".getBytes());
+            kvStore.put(new Bytes("key4".getBytes()), "4444".getBytes());
+            kvStore.put(new Bytes("key5".getBytes()), "55555".getBytes());
+            kvStore.put(new Bytes("key6".getBytes()), "666666".getBytes());
+            kvStore.put(new Bytes("key7".getBytes()), "7777777".getBytes());
+        } finally {
+            kvStore.close();
+        }
+    }
+
+    private Headers testHeaders() {
+        final RecordHeaders headers = new RecordHeaders();
+        headers.add(new RecordHeader("source", 
"test".getBytes(StandardCharsets.UTF_8)));
+        return headers;
+    }
+
+    private byte[] serializeAggWithHeaders(final String aggregation, final 
Headers headers) {
+        return aggSerializer.serialize(null, 
AggregationWithHeaders.make(aggregation, headers));
+    }
+
+    private void assertMigratedValue(final byte[] value, final String 
expectedAggregation) {
+        final Headers headers = 
AggregationWithHeadersDeserializer.headers(value);
+        assertFalse(headers.iterator().hasNext(), "Migrated value should have 
empty headers");
+        assertArrayEquals(
+            expectedAggregation.getBytes(StandardCharsets.UTF_8),
+            AggregationWithHeadersDeserializer.rawAggregation(value),
+            "Migrated value should preserve original aggregation: " + 
expectedAggregation);
+    }
+
+    private void assertValueWithHeaders(final byte[] value, final String 
expectedAggregation, final Headers expectedHeaders) {
+        final AggregationWithHeaders<String> deserialized = 
aggDeserializer.deserialize(null, value);
+        assertEquals(expectedAggregation, deserialized.aggregation());
+        for (final Header header : expectedHeaders) {
+            assertArrayEquals(
+                header.value(),
+                deserialized.headers().lastHeader(header.key()).value(),
+                "Expected header '" + header.key() + "' to match");
+        }
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeadersTest.java
new file mode 100644
index 00000000000..2f9c522f368
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeadersTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class RocksDBSessionStoreWithHeadersTest {
+
+    @Test
+    public void shouldThrowUnsupportedOperationOnQuery() {
+        final RocksDBSessionStoreWithHeaders store = new 
RocksDBSessionStoreWithHeaders(
+            new RocksDBSegmentedBytesStore("test", "scope", 10_000L,
+                60_000L, new SessionKeySchema()));
+        assertThrows(UnsupportedOperationException.class,
+            () -> store.query(null, null, null));
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeadersTest.java
new file mode 100644
index 00000000000..761058193f1
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeadersTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class RocksDBTimeOrderedSessionStoreWithHeadersTest {
+
+    @Test
+    public void shouldThrowUnsupportedOperationOnQueryWithIndex() {
+        final RocksDBTimeOrderedSessionStoreWithHeaders store = new 
RocksDBTimeOrderedSessionStoreWithHeaders(
+            new RocksDBTimeOrderedSessionSegmentedBytesStore("test", "scope",
+                10_000L, 60_000L, true));
+        assertThrows(UnsupportedOperationException.class,
+            () -> store.query(null, null, null));
+    }
+
+    @Test
+    public void shouldThrowUnsupportedOperationOnQueryWithOutIndex() {
+        final RocksDBTimeOrderedSessionStoreWithHeaders store = new 
RocksDBTimeOrderedSessionStoreWithHeaders(
+            new RocksDBTimeOrderedSessionSegmentedBytesStore("test", "scope",
+                10_000L, 60_000L, false));
+        assertThrows(UnsupportedOperationException.class,
+            () -> store.query(null, null, null));
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionSegmentWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionSegmentWithHeadersTest.java
new file mode 100644
index 00000000000..1aeba768601
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionSegmentWithHeadersTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.io.File;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static 
org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class SessionSegmentWithHeadersTest {
+
+    private final RocksDBMetricsRecorder metricsRecorder =
+        new RocksDBMetricsRecorder("metrics-scope", "store-name");
+
+    @BeforeEach
+    public void setUp() {
+        metricsRecorder.init(
+            new StreamsMetricsImpl(new Metrics(), "test-client", new 
MockTime()),
+            new TaskId(0, 0)
+        );
+    }
+
+    @Test
+    public void shouldDeleteStateDirectoryOnDestroy() throws Exception {
+        final SessionSegmentWithHeaders segment =
+            new SessionSegmentWithHeaders("segment", "window", 0L, 
Position.emptyPosition(), metricsRecorder);
+        final String directoryPath = 
TestUtils.tempDirectory().getAbsolutePath();
+        final File directory = new File(directoryPath);
+
+        segment.openDB(mkMap(mkEntry(METRICS_RECORDING_LEVEL_CONFIG, "INFO")), 
directory);
+
+        assertTrue(new File(directoryPath, "window").exists());
+        assertTrue(new File(directoryPath + File.separator + "window", 
"segment").exists());
+        assertTrue(new File(directoryPath + File.separator + "window", 
"segment").list().length > 0);
+        segment.destroy();
+        assertFalse(new File(directoryPath + File.separator + "window", 
"segment").exists());
+        assertTrue(new File(directoryPath, "window").exists());
+
+        segment.close();
+    }
+
+    @Test
+    public void shouldBeEqualIfIdIsEqual() {
+        final SessionSegmentWithHeaders segment =
+            new SessionSegmentWithHeaders("anyName", "anyName", 0L, 
Position.emptyPosition(), metricsRecorder);
+        final SessionSegmentWithHeaders segmentSameId =
+            new SessionSegmentWithHeaders("someOtherName", "someOtherName", 
0L, Position.emptyPosition(), metricsRecorder);
+        final SessionSegmentWithHeaders segmentDifferentId =
+            new SessionSegmentWithHeaders("anyName", "anyName", 1L, 
Position.emptyPosition(), metricsRecorder);
+
+        assertEquals(segment, segment);
+        assertEquals(segment, segmentSameId);
+        assertNotEquals(segment, segmentDifferentId);
+        assertNotEquals(segment, null);
+        assertNotEquals(segment, "anyName");
+
+        segment.close();
+        segmentSameId.close();
+        segmentDifferentId.close();
+    }
+
+    @Test
+    public void shouldHashOnSegmentIdOnly() {
+        final SessionSegmentWithHeaders segment =
+            new SessionSegmentWithHeaders("anyName", "anyName", 0L, 
Position.emptyPosition(), metricsRecorder);
+        final SessionSegmentWithHeaders segmentSameId =
+            new SessionSegmentWithHeaders("someOtherName", "someOtherName", 
0L, Position.emptyPosition(), metricsRecorder);
+        final SessionSegmentWithHeaders segmentDifferentId =
+            new SessionSegmentWithHeaders("anyName", "anyName", 1L, 
Position.emptyPosition(), metricsRecorder);
+
+        final Set<SessionSegmentWithHeaders> set = new HashSet<>();
+        assertTrue(set.add(segment));
+        assertFalse(set.add(segmentSameId));
+        assertTrue(set.add(segmentDifferentId));
+
+        segment.close();
+        segmentSameId.close();
+        segmentDifferentId.close();
+    }
+
+    @Test
+    public void shouldCompareSegmentIdOnly() {
+        final SessionSegmentWithHeaders segment1 =
+            new SessionSegmentWithHeaders("a", "C", 50L, 
Position.emptyPosition(), metricsRecorder);
+        final SessionSegmentWithHeaders segment2 =
+            new SessionSegmentWithHeaders("b", "B", 100L, 
Position.emptyPosition(), metricsRecorder);
+        final SessionSegmentWithHeaders segment3 =
+            new SessionSegmentWithHeaders("c", "A", 0L, 
Position.emptyPosition(), metricsRecorder);
+
+        assertEquals(0, segment1.compareTo(segment1));
+        assertEquals(-1, segment1.compareTo(segment2));
+        assertEquals(1, segment2.compareTo(segment1));
+        assertEquals(1, segment1.compareTo(segment3));
+        assertEquals(-1, segment3.compareTo(segment1));
+        assertEquals(1, segment2.compareTo(segment3));
+        assertEquals(-1, segment3.compareTo(segment2));
+
+        segment1.close();
+        segment2.close();
+        segment3.close();
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeadersTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeadersTest.java
new file mode 100644
index 00000000000..eea7d60a16f
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeadersTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class SessionSegmentsWithHeadersTest {
+
+    private static final long SEGMENT_INTERVAL = 100L;
+    private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
+    private static final String METRICS_SCOPE = "test-state-id";
+    private InternalMockProcessorContext context;
+    private SessionSegmentsWithHeaders segments;
+    private File stateDirectory;
+    private final String storeName = "test";
+
+    @BeforeEach
+    public void createContext() {
+        stateDirectory = TestUtils.tempDirectory();
+        context = new InternalMockProcessorContext<>(
+            stateDirectory,
+            Serdes.String(),
+            Serdes.Long(),
+            new MockRecordCollector(),
+            new ThreadCache(new LogContext("testCache "), 0, new 
MockStreamsMetrics(new Metrics()))
+        );
+        segments = new SessionSegmentsWithHeaders(storeName, METRICS_SCOPE, 
RETENTION_PERIOD, SEGMENT_INTERVAL);
+        segments.openExisting(context, -1L);
+    }
+
+    @AfterEach
+    public void close() {
+        segments.close();
+    }
+
+    @Test
+    public void shouldGetSegmentIdsFromTimestamp() {
+        assertEquals(0, segments.segmentId(0));
+        assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
+        assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
+        assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
+    }
+
+    @Test
+    public void shouldGetSegmentNameFromId() {
+        assertEquals("test.0", segments.segmentName(0));
+        assertEquals("test." + SEGMENT_INTERVAL, segments.segmentName(1));
+        assertEquals("test." + 2 * SEGMENT_INTERVAL, segments.segmentName(2));
+    }
+
+    @Test
+    public void shouldCreateSegments() {
+        final SessionSegmentWithHeaders segment1 = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
+        final SessionSegmentWithHeaders segment2 = 
segments.getOrCreateSegmentIfLive(1, context, -1L);
+        final SessionSegmentWithHeaders segment3 = 
segments.getOrCreateSegmentIfLive(2, context, -1L);
+
+        assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
+        assertTrue(new File(context.stateDir(), "test/test." + 
SEGMENT_INTERVAL).isDirectory());
+        assertTrue(new File(context.stateDir(), "test/test." + 2 * 
SEGMENT_INTERVAL).isDirectory());
+        assertTrue(segment1.isOpen());
+        assertTrue(segment2.isOpen());
+        assertTrue(segment3.isOpen());
+    }
+
+    @Test
+    public void shouldNotCreateSegmentThatIsAlreadyExpired() {
+        final long streamTime = updateStreamTimeAndCreateSegment(7);
+        assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
+        assertFalse(new File(context.stateDir(), "test/test.0").exists());
+    }
+
+    @Test
+    public void shouldCleanupSegmentsThatHaveExpired() {
+        final SessionSegmentWithHeaders segment1 = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
+        final SessionSegmentWithHeaders segment2 = 
segments.getOrCreateSegmentIfLive(1, context, -1L);
+        final SessionSegmentWithHeaders segment3 = 
segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L);
+
+        assertFalse(segment1.isOpen());
+        assertFalse(segment2.isOpen());
+        assertTrue(segment3.isOpen());
+        assertFalse(new File(context.stateDir(), "test/test.0").exists());
+        assertFalse(new File(context.stateDir(), "test/test." + 
SEGMENT_INTERVAL).exists());
+        assertTrue(new File(context.stateDir(), "test/test." + 7 * 
SEGMENT_INTERVAL).exists());
+    }
+
+    @Test
+    public void shouldGetSegmentForTimestamp() {
+        final SessionSegmentWithHeaders segment = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
+        segments.getOrCreateSegmentIfLive(1, context, -1L);
+        assertEquals(segment, segments.segmentForTimestamp(0L));
+    }
+
+    @Test
+    public void shouldGetCorrectSegmentString() {
+        final SessionSegmentWithHeaders segment = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
+        assertEquals("SessionSegmentWithHeaders(id=0, name=test.0)", 
segment.toString());
+    }
+
+    @Test
+    public void shouldCloseAllOpenSegments() {
+        final SessionSegmentWithHeaders first = 
segments.getOrCreateSegmentIfLive(0, context, -1L);
+        final SessionSegmentWithHeaders second = 
segments.getOrCreateSegmentIfLive(1, context, -1L);
+        final SessionSegmentWithHeaders third = 
segments.getOrCreateSegmentIfLive(2, context, -1L);
+        segments.close();
+
+        assertFalse(first.isOpen());
+        assertFalse(second.isOpen());
+        assertFalse(third.isOpen());
+    }
+
+    @Test
+    public void shouldOpenExistingSegments() {
+        segments = new SessionSegmentsWithHeaders("test", METRICS_SCOPE, 4, 1);
+        segments.openExisting(context, -1L);
+        segments.getOrCreateSegmentIfLive(0, context, -1L);
+        segments.getOrCreateSegmentIfLive(1, context, -1L);
+        segments.getOrCreateSegmentIfLive(2, context, -1L);
+        segments.getOrCreateSegmentIfLive(3, context, -1L);
+        segments.getOrCreateSegmentIfLive(4, context, -1L);
+        // close existing.
+        segments.close();
+
+        segments = new SessionSegmentsWithHeaders("test", METRICS_SCOPE, 4, 1);
+        segments.openExisting(context, -1L);
+
+        assertTrue(segments.segmentForTimestamp(0).isOpen());
+        assertTrue(segments.segmentForTimestamp(1).isOpen());
+        assertTrue(segments.segmentForTimestamp(2).isOpen());
+        assertTrue(segments.segmentForTimestamp(3).isOpen());
+        assertTrue(segments.segmentForTimestamp(4).isOpen());
+    }
+
+    @Test
+    public void shouldGetSegmentsWithinTimeRange() {
+        updateStreamTimeAndCreateSegment(0);
+        updateStreamTimeAndCreateSegment(1);
+        updateStreamTimeAndCreateSegment(2);
+        updateStreamTimeAndCreateSegment(3);
+        final long streamTime = updateStreamTimeAndCreateSegment(4);
+        segments.getOrCreateSegmentIfLive(0, context, streamTime);
+        segments.getOrCreateSegmentIfLive(1, context, streamTime);
+        segments.getOrCreateSegmentIfLive(2, context, streamTime);
+        segments.getOrCreateSegmentIfLive(3, context, streamTime);
+        segments.getOrCreateSegmentIfLive(4, context, streamTime);
+
+        final List<SessionSegmentWithHeaders> segments = 
this.segments.segments(0, 2 * SEGMENT_INTERVAL, true);
+        assertEquals(3, segments.size());
+        assertEquals(0, segments.get(0).id);
+        assertEquals(1, segments.get(1).id);
+        assertEquals(2, segments.get(2).id);
+    }
+
+    @Test
+    public void shouldClearSegmentsOnClose() {
+        segments.getOrCreateSegmentIfLive(0, context, -1L);
+        segments.close();
+        assertNull(segments.segmentForTimestamp(0));
+    }
+
+    private long updateStreamTimeAndCreateSegment(final int segment) {
+        final long streamTime = SEGMENT_INTERVAL * segment;
+        segments.getOrCreateSegmentIfLive(segment, context, streamTime);
+        return streamTime;
+    }
+}

Reply via email to