This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c00d1c7cb8d KAFKA-20158 RocksDB session stores with headers support
(4/N) (#21571)
c00d1c7cb8d is described below
commit c00d1c7cb8db8b5a86b10ff2f15704daa902103c
Author: Bill Bejeck <[email protected]>
AuthorDate: Fri Feb 27 09:32:50 2026 -0500
KAFKA-20158 RocksDB session stores with headers support (4/N) (#21571)
Adds support for `RocksDBSessionStore` to support headers for
[KIP-1271](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1271%3A+Allow+to+Store+Record+Headers+in+State+Stores)
and includes testing for lazily converting the format in the open in
upgrade mode.
---
.../org/apache/kafka/streams/state/Stores.java | 27 +-
.../RocksDBMigratingSessionStoreWithHeaders.java | 86 ++++
.../internals/RocksDBSessionStoreWithHeaders.java | 45 ++
.../RocksDBTimeOrderedSessionStoreWithHeaders.java | 45 ++
.../RocksDbSessionBytesStoreSupplier.java | 31 +-
...ocksDbTimeOrderedSessionBytesStoreSupplier.java | 18 +-
...ssionRocksDBSegmentedBytesStoreWithHeaders.java | 36 ++
.../state/internals/SessionSegmentWithHeaders.java | 89 ++++
.../internals/SessionSegmentsWithHeaders.java | 62 +++
.../internals/AbstractSessionBytesStoreTest.java | 46 ++
...ocksDBMigratingSessionStoreWithHeadersTest.java | 512 +++++++++++++++++++++
.../RocksDBSessionStoreWithHeadersTest.java | 33 ++
...ksDBTimeOrderedSessionStoreWithHeadersTest.java | 42 ++
.../internals/SessionSegmentWithHeadersTest.java | 140 ++++++
.../internals/SessionSegmentsWithHeadersTest.java | 197 ++++++++
15 files changed, 1398 insertions(+), 11 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
index e36fe042b6a..b1e8234f3c4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java
@@ -440,13 +440,38 @@ public final class Stores {
*/
public static SessionBytesStoreSupplier persistentSessionStore(final
String name,
final
Duration retentionPeriod) {
+ return persistentSessionStore(name, retentionPeriod, false);
+ }
+
+ /**
+ * Create a persistent {@link SessionBytesStoreSupplier} with support for
record headers.
+ * <p>
+ * Note that it is not safe to change the value of {@code retentionPeriod}
between
+ * application restarts without clearing local state from application
instances,
+ * as this may cause incorrect values to be read from the state store if
it impacts
+ * the underlying storage format.
+ *
+ * @param name name of the store (cannot be {@code null})
+ * @param retentionPeriod length of time to retain data in the store
(cannot be negative)
+ * (note that the retention period must be at
least as long enough to
+ * contain the inactivity gap of the session and
the entire grace period.)
+ * @return an instance of a {@link SessionBytesStoreSupplier}
+ */
+ public static SessionBytesStoreSupplier
persistentSessionStoreWithHeaders(final String name,
+
final Duration retentionPeriod) {
+ return persistentSessionStore(name, retentionPeriod, true);
+ }
+
+ private static SessionBytesStoreSupplier persistentSessionStore(final
String name,
+ final
Duration retentionPeriod,
+ final
boolean withHeaders) {
Objects.requireNonNull(name, "name cannot be null");
final String msgPrefix =
prepareMillisCheckFailMsgPrefix(retentionPeriod, "retentionPeriod");
final long retentionPeriodMs =
validateMillisecondDuration(retentionPeriod, msgPrefix);
if (retentionPeriodMs < 0) {
throw new IllegalArgumentException("retentionPeriod cannot be
negative");
}
- return new RocksDbSessionBytesStoreSupplier(name, retentionPeriodMs);
+ return new RocksDbSessionBytesStoreSupplier(name, retentionPeriodMs,
withHeaders);
}
/**
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeaders.java
new file mode 100644
index 00000000000..d0534915a06
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeaders.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.state.HeadersBytesStore;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+/**
+ * A persistent key-value store with headers support based on RocksDB.
+ * <p>
+ * This store provides a migration path from plain {@link RocksDBStore}
(DEFAULT column family)
+ * to a headers-aware column family ({@code sessionKeyValueWithHeaders}). It
uses
+ * {@link DualColumnFamilyAccessor} for lazy migration when legacy data exists
in the DEFAULT CF.
+ */
+public class RocksDBMigratingSessionStoreWithHeaders extends RocksDBStore
implements HeadersBytesStore {
+ private static final Logger log =
LoggerFactory.getLogger(RocksDBMigratingSessionStoreWithHeaders.class);
+
+ static final byte[] SESSION_STORE_HEADERS_VALUES_COLUMN_FAMILY_NAME =
"sessionKeyValueWithHeaders".getBytes(StandardCharsets.UTF_8);
+
+ public RocksDBMigratingSessionStoreWithHeaders(final String name,
+ final String metricsScope) {
+ super(name, metricsScope);
+ }
+
+ RocksDBMigratingSessionStoreWithHeaders(final String name,
+ final String parentDir,
+ final RocksDBMetricsRecorder
metricsRecorder) {
+ super(name, parentDir, metricsRecorder);
+ }
+
+ @Override
+ void openRocksDB(final DBOptions dbOptions,
+ final ColumnFamilyOptions columnFamilyOptions) {
+ final List<ColumnFamilyHandle> columnFamilies = openRocksDB(
+ dbOptions,
+ new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions),
+ new
ColumnFamilyDescriptor(SESSION_STORE_HEADERS_VALUES_COLUMN_FAMILY_NAME,
columnFamilyOptions)
+ );
+ final ColumnFamilyHandle noHeadersColumnFamily = columnFamilies.get(0);
+ final ColumnFamilyHandle withHeadersColumnFamily =
columnFamilies.get(1);
+
+ final RocksIterator noHeadersIter =
db.newIterator(noHeadersColumnFamily);
+ noHeadersIter.seekToFirst();
+ if (noHeadersIter.isValid()) {
+ log.info("Opening store {} in upgrade mode", name);
+ cfAccessor = new DualColumnFamilyAccessor(
+ noHeadersColumnFamily,
+ withHeadersColumnFamily,
+ HeadersBytesStore::convertToHeaderFormat,
+ this
+ );
+ } else {
+ log.info("Opening store {} in regular mode", name);
+ cfAccessor = new
SingleColumnFamilyAccessor(withHeadersColumnFamily);
+ noHeadersColumnFamily.close();
+ }
+ noHeadersIter.close();
+ }
+
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeaders.java
new file mode 100644
index 00000000000..75d0ed17eff
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeaders.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.HeadersBytesStore;
+
+/**
+ * RocksDB-backed session store with support for record headers.
+ * <p>
+ * This store extends {@link RocksDBSessionStore} and overrides
+ * {@code query()} to disable IQv2 for header-aware stores.
+ * <p>
+ * The storage format for values is:
[headersSize(varint)][headersBytes][aggregationBytes]
+ *
+ * @see RocksDBSessionStore
+ */
+class RocksDBSessionStoreWithHeaders extends RocksDBSessionStore implements
HeadersBytesStore {
+
+ RocksDBSessionStoreWithHeaders(final SegmentedBytesStore bytesStore) {
+ super(bytesStore);
+ }
+
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query, final PositionBound
positionBound, final QueryConfig config) {
+ throw new UnsupportedOperationException("Querying stores with headers
is not supported");
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
new file mode 100644
index 00000000000..2d5ab757804
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeaders.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.HeadersBytesStore;
+
+/**
+ * RocksDB-backed time-ordered session store with support for record headers.
+ * <p>
+ * This store extends {@link RocksDBTimeOrderedSessionStore} and overrides
+ * {@code query()} to disable IQv2 for header-aware stores.
+ * <p>
+ * The storage format for values is:
[headersSize(varint)][headersBytes][aggregationBytes]
+ *
+ * @see RocksDBTimeOrderedSessionStore
+ */
+class RocksDBTimeOrderedSessionStoreWithHeaders extends
RocksDBTimeOrderedSessionStore implements HeadersBytesStore {
+
+ RocksDBTimeOrderedSessionStoreWithHeaders(final
RocksDBTimeOrderedSessionSegmentedBytesStore store) {
+ super(store);
+ }
+
+ @Override
+ public <R> QueryResult<R> query(final Query<R> query, final PositionBound
positionBound, final QueryConfig config) {
+ throw new UnsupportedOperationException("Querying stores with headers
is not supported");
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
index 684ebf41431..6bd542ecf93 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbSessionBytesStoreSupplier.java
@@ -23,11 +23,19 @@ import org.apache.kafka.streams.state.SessionStore;
public class RocksDbSessionBytesStoreSupplier implements
SessionBytesStoreSupplier {
private final String name;
private final long retentionPeriod;
+ private final boolean withHeaders;
public RocksDbSessionBytesStoreSupplier(final String name,
final long retentionPeriod) {
+ this(name, retentionPeriod, false);
+ }
+
+ public RocksDbSessionBytesStoreSupplier(final String name,
+ final long retentionPeriod,
+ final boolean withHeaders) {
this.name = name;
this.retentionPeriod = retentionPeriod;
+ this.withHeaders = withHeaders;
}
@Override
@@ -37,13 +45,22 @@ public class RocksDbSessionBytesStoreSupplier implements
SessionBytesStoreSuppli
@Override
public SessionStore<Bytes, byte[]> get() {
- final RocksDBSegmentedBytesStore segmented = new
RocksDBSegmentedBytesStore(
- name,
- metricsScope(),
- retentionPeriod,
- segmentIntervalMs(),
- new SessionKeySchema());
- return new RocksDBSessionStore(segmented);
+ if (withHeaders) {
+ return new RocksDBSessionStoreWithHeaders(
+ new SessionRocksDBSegmentedBytesStoreWithHeaders(
+ name,
+ metricsScope(),
+ retentionPeriod,
+ segmentIntervalMs(),
+ new SessionKeySchema()));
+ }
+ return new RocksDBSessionStore(
+ new RocksDBSegmentedBytesStore(
+ name,
+ metricsScope(),
+ retentionPeriod,
+ segmentIntervalMs(),
+ new SessionKeySchema()));
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbTimeOrderedSessionBytesStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbTimeOrderedSessionBytesStoreSupplier.java
index 60cd710e6a3..25473fb97d7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbTimeOrderedSessionBytesStoreSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbTimeOrderedSessionBytesStoreSupplier.java
@@ -24,13 +24,22 @@ public class RocksDbTimeOrderedSessionBytesStoreSupplier
implements SessionBytes
private final String name;
private final long retentionPeriod;
private final boolean withIndex;
+ private final boolean withHeaders;
public RocksDbTimeOrderedSessionBytesStoreSupplier(final String name,
final long
retentionPeriod,
final boolean
withIndex) {
+ this(name, retentionPeriod, withIndex, false);
+ }
+
+ public RocksDbTimeOrderedSessionBytesStoreSupplier(final String name,
+ final long
retentionPeriod,
+ final boolean withIndex,
+ final boolean
withHeaders) {
this.name = name;
this.retentionPeriod = retentionPeriod;
this.withIndex = withIndex;
+ this.withHeaders = withHeaders;
}
@Override
@@ -40,15 +49,18 @@ public class RocksDbTimeOrderedSessionBytesStoreSupplier
implements SessionBytes
@Override
public SessionStore<Bytes, byte[]> get() {
- return new RocksDBTimeOrderedSessionStore(
+ final RocksDBTimeOrderedSessionSegmentedBytesStore bytesStore =
new RocksDBTimeOrderedSessionSegmentedBytesStore(
name,
metricsScope(),
retentionPeriod,
segmentIntervalMs(),
withIndex
- )
- );
+ );
+ if (withHeaders) {
+ return new RocksDBTimeOrderedSessionStoreWithHeaders(bytesStore);
+ }
+ return new RocksDBTimeOrderedSessionStore(bytesStore);
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionRocksDBSegmentedBytesStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionRocksDBSegmentedBytesStoreWithHeaders.java
new file mode 100644
index 00000000000..e134e8f4d7d
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionRocksDBSegmentedBytesStoreWithHeaders.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+/**
+ * A RocksDB-backed segmented bytes store with headers support.
+ * <p>
+ * This store uses {@link SessionSegmentsWithHeaders} to manage segments,
+ * where each segment is a {@link SessionSegmentWithHeaders} that extends
+ * {@link RocksDBMigratingSessionStoreWithHeaders}. This provides automatic
dual-CF
+ * migration support from plain key-value format to headers format.
+ */
+public class SessionRocksDBSegmentedBytesStoreWithHeaders extends
AbstractRocksDBSegmentedBytesStore<SessionSegmentWithHeaders> {
+
+ SessionRocksDBSegmentedBytesStoreWithHeaders(final String name,
+ final String metricsScope,
+ final long retention,
+ final long segmentInterval,
+ final KeySchema keySchema) {
+ super(name, retention, keySchema, new SessionSegmentsWithHeaders(name,
metricsScope, retention, segmentInterval));
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionSegmentWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionSegmentWithHeaders.java
new file mode 100644
index 00000000000..f93dfdbf2ab
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionSegmentWithHeaders.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A segment that stores key-value pairs with headers.
+ * <p>
+ * This segment extends {@link RocksDBMigratingSessionStoreWithHeaders} to
provide
+ * header-aware storage with dual-column-family migration support from
+ * plain key-value format to headers format.
+ */
+class SessionSegmentWithHeaders extends
RocksDBMigratingSessionStoreWithHeaders implements Segment {
+
+ public final long id;
+
+ SessionSegmentWithHeaders(final String segmentName,
+ final String windowName,
+ final long id,
+ final Position position,
+ final RocksDBMetricsRecorder metricsRecorder) {
+ super(segmentName, windowName, metricsRecorder);
+ this.id = id;
+ this.position = position;
+ }
+
+ @Override
+ public void destroy() throws IOException {
+ Utils.delete(dbDir);
+ }
+
+ @Override
+ public long id() {
+ return id;
+ }
+
+ @Override
+ public void deleteRange(final Bytes keyFrom, final Bytes keyTo) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void openDB(final Map<String, Object> configs, final File stateDir)
{
+ super.openDB(configs, stateDir);
+ // skip the registering step
+ }
+
+ @Override
+ public String toString() {
+ return "SessionSegmentWithHeaders(id=" + id + ", name=" + name() + ")";
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ final SessionSegmentWithHeaders segment = (SessionSegmentWithHeaders)
obj;
+ return id == segment.id;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id);
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeaders.java
new file mode 100644
index 00000000000..9d940a26453
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeaders.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.internals.ProcessorContextUtils;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+
+/**
+ * Manages the {@link SessionSegmentWithHeaders}s that are used by the {@link
SessionRocksDBSegmentedBytesStoreWithHeaders}.
+ */
+class SessionSegmentsWithHeaders extends
AbstractSegments<SessionSegmentWithHeaders> {
+
+ private final RocksDBMetricsRecorder metricsRecorder;
+
+ SessionSegmentsWithHeaders(final String name,
+ final String metricsScope,
+ final long retentionPeriod,
+ final long segmentInterval) {
+ super(name, retentionPeriod, segmentInterval);
+ metricsRecorder = new RocksDBMetricsRecorder(metricsScope, name);
+ }
+
+ @Override
+ protected SessionSegmentWithHeaders createSegment(final long segmentId,
final String segmentName) {
+ return new SessionSegmentWithHeaders(segmentName, name, segmentId,
position, metricsRecorder);
+ }
+
+ @Override
+ protected void openSegmentDB(final SessionSegmentWithHeaders segment,
final StateStoreContext context) {
+ segment.openDB(context.appConfigs(), context.stateDir());
+ }
+
+ @Override
+ public SessionSegmentWithHeaders getOrCreateSegmentIfLive(final long
segmentId,
+ final
StateStoreContext context,
+ final long
streamTime) {
+ final SessionSegmentWithHeaders segment =
super.getOrCreateSegmentIfLive(segmentId, context, streamTime);
+ cleanupExpiredSegments(streamTime);
+ return segment;
+ }
+
+ @Override
+ public void openExisting(final StateStoreContext context, final long
streamTime) {
+ metricsRecorder.init(ProcessorContextUtils.metricsImpl(context),
context.taskId());
+ super.openExisting(context, streamTime);
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
index 18b7dd89003..d70bcc964db 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractSessionBytesStoreTest.java
@@ -85,8 +85,11 @@ public abstract class AbstractSessionBytesStoreTest {
enum StoreType {
RocksDBSessionStore,
+ RocksDBSessionStoreWithHeaders,
RocksDBTimeOrderedSessionStoreWithIndex,
RocksDBTimeOrderedSessionStoreWithoutIndex,
+ RocksDBTimeOrderedSessionStoreWithHeadersWithIndex,
+ RocksDBTimeOrderedSessionStoreWithHeadersWithoutIndex,
InMemoryStore
}
@@ -130,6 +133,49 @@ public abstract class AbstractSessionBytesStoreTest {
valueSerde
).build();
}
+ case RocksDBSessionStoreWithHeaders: {
+ return Stores.sessionStoreBuilder(
+ new
RocksDbSessionBytesStoreSupplier(ROCK_DB_STORE_NAME, retentionPeriod) {
+ @Override
+ public SessionStore<Bytes, byte[]> get() {
+ return new RocksDBSessionStoreWithHeaders(
+ new RocksDBSegmentedBytesStore(
+ name(), metricsScope(),
retentionPeriod(), segmentIntervalMs(),
+ new SessionKeySchema()));
+ }
+ },
+ keySerde,
+ valueSerde
+ ).build();
+ }
+ case RocksDBTimeOrderedSessionStoreWithHeadersWithIndex: {
+ return Stores.sessionStoreBuilder(
+ new
RocksDbTimeOrderedSessionBytesStoreSupplier(ROCK_DB_STORE_NAME,
retentionPeriod, true) {
+ @Override
+ public SessionStore<Bytes, byte[]> get() {
+ return new
RocksDBTimeOrderedSessionStoreWithHeaders(
+ new
RocksDBTimeOrderedSessionSegmentedBytesStore(
+ name(), metricsScope(),
retentionPeriod(), segmentIntervalMs(), true));
+ }
+ },
+ keySerde,
+ valueSerde
+ ).build();
+ }
+ case RocksDBTimeOrderedSessionStoreWithHeadersWithoutIndex: {
+ return Stores.sessionStoreBuilder(
+ new
RocksDbTimeOrderedSessionBytesStoreSupplier(ROCK_DB_STORE_NAME,
retentionPeriod, false) {
+ @Override
+ public SessionStore<Bytes, byte[]> get() {
+ return new
RocksDBTimeOrderedSessionStoreWithHeaders(
+ new
RocksDBTimeOrderedSessionSegmentedBytesStore(
+ name(), metricsScope(),
retentionPeriod(), segmentIntervalMs(), false));
+ }
+ },
+ keySerde,
+ valueSerde
+ ).build();
+ }
case InMemoryStore: {
return Stores.sessionStoreBuilder(
Stores.inMemorySessionStore(
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeadersTest.java
new file mode 100644
index 00000000000..735ffe4167e
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBMigratingSessionStoreWithHeadersTest.java
@@ -0,0 +1,512 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.AggregationWithHeaders;
+import org.apache.kafka.streams.state.KeyValueIterator;
+
+import org.junit.jupiter.api.Test;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.RocksDB;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RocksDBMigratingSessionStoreWithHeadersTest extends
RocksDBStoreTest {
+
+ private final Serializer<String> stringSerializer = new StringSerializer();
+ private final AggregationWithHeadersSerializer<String> aggSerializer =
+ new AggregationWithHeadersSerializer<>(new StringSerializer());
+ private final AggregationWithHeadersDeserializer<String> aggDeserializer =
+ new AggregationWithHeadersDeserializer<>(new StringDeserializer());
+ private final byte[] sessionStoreHeaderColumnFamilyName =
RocksDBMigratingSessionStoreWithHeaders.SESSION_STORE_HEADERS_VALUES_COLUMN_FAMILY_NAME;
+
+ RocksDBStore getRocksDBStore() {
+ return new RocksDBMigratingSessionStoreWithHeaders(DB_NAME,
METRICS_SCOPE);
+ }
+
+ @Test
+ public void shouldOpenNewStoreInRegularMode() {
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(RocksDBMigratingSessionStoreWithHeaders.class))
{
+ rocksDBStore.init(context, rocksDBStore);
+
+ assertTrue(appender.getMessages().contains("Opening store " +
DB_NAME + " in regular mode"));
+ }
+
+ try (final KeyValueIterator<Bytes, byte[]> iterator =
rocksDBStore.all()) {
+ assertFalse(iterator.hasNext());
+ }
+ }
+
+ @Test
+ public void shouldOpenExistingStoreInRegularMode() throws Exception {
+ final String key = "key";
+ final String value = "withHeaders";
+ // prepare store
+ rocksDBStore.init(context, rocksDBStore);
+ rocksDBStore.put(new Bytes(key.getBytes()), value.getBytes());
+ rocksDBStore.close();
+
+ // re-open store
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(RocksDBMigratingSessionStoreWithHeaders.class))
{
+ rocksDBStore.init(context, rocksDBStore);
+
+ assertTrue(appender.getMessages().contains("Opening store " +
DB_NAME + " in regular mode"));
+ } finally {
+ rocksDBStore.close();
+ }
+
+ // verify store
+ final DBOptions dbOptions = new DBOptions();
+ final ColumnFamilyOptions columnFamilyOptions = new
ColumnFamilyOptions();
+
+ final List<ColumnFamilyDescriptor> columnFamilyDescriptors = asList(
+ new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions),
+ new ColumnFamilyDescriptor(sessionStoreHeaderColumnFamilyName,
columnFamilyOptions));
+ final List<ColumnFamilyHandle> columnFamilies = new
ArrayList<>(columnFamilyDescriptors.size());
+
+ RocksDB db = null;
+ ColumnFamilyHandle defaultColumnFamily = null, headersColumnFamily =
null;
+ try {
+ db = RocksDB.open(
+ dbOptions,
+ new File(new File(context.stateDir(), "rocksdb"),
DB_NAME).getAbsolutePath(),
+ columnFamilyDescriptors,
+ columnFamilies);
+
+ defaultColumnFamily = columnFamilies.get(0);
+ headersColumnFamily = columnFamilies.get(1);
+
+ assertNull(db.get(defaultColumnFamily, "key".getBytes()));
+ assertEquals(0L, db.getLongProperty(defaultColumnFamily,
"rocksdb.estimate-num-keys"));
+ assertEquals(value.getBytes().length, db.get(headersColumnFamily,
"key".getBytes()).length);
+ assertEquals(1L, db.getLongProperty(headersColumnFamily,
"rocksdb.estimate-num-keys"));
+ } finally {
+ // Order of closing must follow: ColumnFamilyHandle > RocksDB >
DBOptions > ColumnFamilyOptions
+ if (defaultColumnFamily != null) {
+ defaultColumnFamily.close();
+ }
+ if (headersColumnFamily != null) {
+ headersColumnFamily.close();
+ }
+ if (db != null) {
+ db.close();
+ }
+ dbOptions.close();
+ columnFamilyOptions.close();
+ }
+ }
+
+ @Test
+ public void shouldMigrateFromDefaultToHeadersAwareColumnFamily() throws
Exception {
+ prepareDefaultStore();
+
+ // Open with RocksDBMigratingSessionStoreWithHeaders - should detect
legacy data in DEFAULT CF and enter upgrade mode
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(RocksDBMigratingSessionStoreWithHeaders.class))
{
+ rocksDBStore.init(context, rocksDBStore);
+
+ assertTrue(appender.getMessages().contains("Opening store " +
DB_NAME + " in upgrade mode"));
+ }
+
+ // get() - tests lazy migration on read: legacy value converted via
convertToHeaderFormat
+ assertNull(rocksDBStore.get(new Bytes("unknown".getBytes())),
"Expected null for unknown key");
+
+ final byte[] key1Result = rocksDBStore.get(new
Bytes("key1".getBytes()));
+ assertMigratedValue(key1Result, "1");
+
+ // put() - tests migration on write using properly serialized
AggregationWithHeaders
+ final byte[] key2Value = serializeAggWithHeaders("22", testHeaders());
+ rocksDBStore.put(new Bytes("key2".getBytes()), key2Value);
+
+ rocksDBStore.put(new Bytes("key3".getBytes()), null);
+
+ final byte[] key8Value = serializeAggWithHeaders("88888888", new
RecordHeaders());
+ rocksDBStore.put(new Bytes("key8new".getBytes()), key8Value);
+
+ // putIfAbsent() - tests migration on conditional write
+ final byte[] key11Value = serializeAggWithHeaders("11111111111",
testHeaders());
+ assertNull(rocksDBStore.putIfAbsent(new Bytes("key11new".getBytes()),
key11Value),
+ "Expected null return value for putIfAbsent on non-existing
key11new");
+
+ final byte[] key5Result = rocksDBStore.putIfAbsent(new
Bytes("key5".getBytes()), null);
+ assertMigratedValue(key5Result, "55555");
+
+ assertNull(rocksDBStore.putIfAbsent(new Bytes("key12new".getBytes()),
null));
+
+ // delete() - tests migration on delete
+ final byte[] key6Result = rocksDBStore.delete(new
Bytes("key6".getBytes()));
+ assertMigratedValue(key6Result, "666666");
+
+ // iterators should not trigger migration (read-only)
+ iteratorsShouldNotMigrateData();
+
+ rocksDBStore.close();
+
+ // Verify the final state of both column families
+ verifyOldAndNewColumnFamily();
+ }
+
+ private void iteratorsShouldNotMigrateData() {
+ // iterating should not migrate any data, but return all keys over
both CFs
+ // Values from DEFAULT CF are converted to header-aware format on the
fly via convertToHeaderFormat
+ try (final KeyValueIterator<Bytes, byte[]> itAll = rocksDBStore.all())
{
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key1".getBytes(), keyValue.key.get()); //
migrated by get()
+ assertMigratedValue(keyValue.value, "1");
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key11new".getBytes(), keyValue.key.get());
// inserted by putIfAbsent()
+ assertValueWithHeaders(keyValue.value, "11111111111",
testHeaders());
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key2".getBytes(), keyValue.key.get()); //
replaced by put()
+ assertValueWithHeaders(keyValue.value, "22", testHeaders());
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key4".getBytes(), keyValue.key.get()); //
not migrated, on-the-fly conversion
+ assertMigratedValue(keyValue.value, "4444");
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key5".getBytes(), keyValue.key.get()); //
migrated by putIfAbsent with null value
+ assertMigratedValue(keyValue.value, "55555");
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key7".getBytes(), keyValue.key.get()); //
not migrated, on-the-fly conversion
+ assertMigratedValue(keyValue.value, "7777777");
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key8new".getBytes(), keyValue.key.get());
// inserted by put()
+ assertMigratedValue(keyValue.value, "88888888");
+ }
+ assertFalse(itAll.hasNext());
+ }
+
+ try (final KeyValueIterator<Bytes, byte[]> it =
+ rocksDBStore.range(new Bytes("key2".getBytes()), new
Bytes("key5".getBytes()))) {
+ {
+ final KeyValue<Bytes, byte[]> keyValue = it.next();
+ assertArrayEquals("key2".getBytes(), keyValue.key.get());
+ assertValueWithHeaders(keyValue.value, "22", testHeaders());
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = it.next();
+ assertArrayEquals("key4".getBytes(), keyValue.key.get());
+ assertMigratedValue(keyValue.value, "4444");
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = it.next();
+ assertArrayEquals("key5".getBytes(), keyValue.key.get());
+ assertMigratedValue(keyValue.value, "55555");
+ }
+ assertFalse(it.hasNext());
+ }
+
+ try (final KeyValueIterator<Bytes, byte[]> itAll =
rocksDBStore.reverseAll()) {
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key8new".getBytes(), keyValue.key.get());
+ assertMigratedValue(keyValue.value, "88888888");
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key7".getBytes(), keyValue.key.get());
+ assertMigratedValue(keyValue.value, "7777777");
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key5".getBytes(), keyValue.key.get());
+ assertMigratedValue(keyValue.value, "55555");
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key4".getBytes(), keyValue.key.get());
+ assertMigratedValue(keyValue.value, "4444");
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key2".getBytes(), keyValue.key.get());
+ assertValueWithHeaders(keyValue.value, "22", testHeaders());
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key11new".getBytes(), keyValue.key.get());
+ assertValueWithHeaders(keyValue.value, "11111111111",
testHeaders());
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = itAll.next();
+ assertArrayEquals("key1".getBytes(), keyValue.key.get());
+ assertMigratedValue(keyValue.value, "1");
+ }
+ assertFalse(itAll.hasNext());
+ }
+
+ try (final KeyValueIterator<Bytes, byte[]> it =
+ rocksDBStore.reverseRange(new
Bytes("key2".getBytes()), new Bytes("key5".getBytes()))) {
+ {
+ final KeyValue<Bytes, byte[]> keyValue = it.next();
+ assertArrayEquals("key5".getBytes(), keyValue.key.get());
+ assertMigratedValue(keyValue.value, "55555");
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = it.next();
+ assertArrayEquals("key4".getBytes(), keyValue.key.get());
+ assertMigratedValue(keyValue.value, "4444");
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = it.next();
+ assertArrayEquals("key2".getBytes(), keyValue.key.get());
+ assertValueWithHeaders(keyValue.value, "22", testHeaders());
+ }
+ assertFalse(it.hasNext());
+ }
+
+ try (final KeyValueIterator<Bytes, byte[]> it =
rocksDBStore.prefixScan("key1", stringSerializer)) {
+ {
+ final KeyValue<Bytes, byte[]> keyValue = it.next();
+ assertArrayEquals("key1".getBytes(), keyValue.key.get());
+ assertMigratedValue(keyValue.value, "1");
+ }
+ {
+ final KeyValue<Bytes, byte[]> keyValue = it.next();
+ assertArrayEquals("key11new".getBytes(), keyValue.key.get());
+ assertValueWithHeaders(keyValue.value, "11111111111",
testHeaders());
+ }
+ assertFalse(it.hasNext());
+ }
+ }
+
+ private void verifyOldAndNewColumnFamily() throws Exception {
+ verifyColumnFamilyContents();
+ verifyStillInUpgradeMode();
+ clearDefaultColumnFamily();
+ verifyInRegularMode();
+ }
+
+ private void verifyColumnFamilyContents() throws Exception {
+ final DBOptions dbOptions = new DBOptions();
+ final ColumnFamilyOptions columnFamilyOptions = new
ColumnFamilyOptions();
+
+ final List<ColumnFamilyDescriptor> columnFamilyDescriptors = asList(
+ new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions),
+ new ColumnFamilyDescriptor(sessionStoreHeaderColumnFamilyName,
columnFamilyOptions));
+
+ final List<ColumnFamilyHandle> columnFamilies = new
ArrayList<>(columnFamilyDescriptors.size());
+ RocksDB db = null;
+ ColumnFamilyHandle defaultColumnFamily = null;
+ ColumnFamilyHandle headersColumnFamily = null;
+ try {
+ db = RocksDB.open(
+ dbOptions,
+ new File(new File(context.stateDir(), "rocksdb"),
DB_NAME).getAbsolutePath(),
+ columnFamilyDescriptors,
+ columnFamilies);
+
+ defaultColumnFamily = columnFamilies.get(0);
+ headersColumnFamily = columnFamilies.get(1);
+
+ verifyDefaultColumnFamily(db, defaultColumnFamily);
+ verifyHeadersColumnFamily(db, headersColumnFamily);
+ } finally {
+ closeColumnFamilies(db, defaultColumnFamily, headersColumnFamily);
+ dbOptions.close();
+ columnFamilyOptions.close();
+ }
+ }
+
+ private void verifyDefaultColumnFamily(final RocksDB db, final
ColumnFamilyHandle defaultColumnFamily) throws Exception {
+ // DEFAULT CF should have un-migrated keys with plain aggregation
bytes; migrated keys should be deleted
+ assertNull(db.get(defaultColumnFamily, "unknown".getBytes()));
+ assertNull(db.get(defaultColumnFamily, "key1".getBytes())); //
migrated by get()
+ assertNull(db.get(defaultColumnFamily, "key2".getBytes())); //
migrated by put()
+ assertNull(db.get(defaultColumnFamily, "key3".getBytes())); // deleted
+ assertArrayEquals("4444".getBytes(), db.get(defaultColumnFamily,
"key4".getBytes())); // not migrated, plain aggregation
+ assertNull(db.get(defaultColumnFamily, "key5".getBytes())); //
migrated by putIfAbsent()
+ assertNull(db.get(defaultColumnFamily, "key6".getBytes())); //
migrated by delete()
+ assertArrayEquals("7777777".getBytes(), db.get(defaultColumnFamily,
"key7".getBytes())); // not migrated, plain aggregation
+ assertNull(db.get(defaultColumnFamily, "key8new".getBytes()));
+ assertNull(db.get(defaultColumnFamily, "key11new".getBytes()));
+ }
+
+ private void verifyHeadersColumnFamily(final RocksDB db, final
ColumnFamilyHandle headersColumnFamily) throws Exception {
+ // Headers CF should have all migrated/new keys in headers-aware format
+ assertNull(db.get(headersColumnFamily, "unknown".getBytes()));
+ assertMigratedValue(db.get(headersColumnFamily, "key1".getBytes()),
"1"); // migrated by get()
+ assertValueWithHeaders(db.get(headersColumnFamily, "key2".getBytes()),
"22", testHeaders()); // put with headers
+ assertNull(db.get(headersColumnFamily, "key3".getBytes())); // put
with null value => deleted
+ assertNull(db.get(headersColumnFamily, "key4".getBytes())); // not
migrated, still in DEFAULT CF
+ assertMigratedValue(db.get(headersColumnFamily, "key5".getBytes()),
"55555"); // migrated by putIfAbsent
+ assertNull(db.get(headersColumnFamily, "key6".getBytes())); //
migrated by delete() => deleted
+ assertNull(db.get(headersColumnFamily, "key7".getBytes())); // not
migrated, still in DEFAULT CF
+ assertMigratedValue(db.get(headersColumnFamily, "key8new".getBytes()),
"88888888"); // put with empty headers
+ assertValueWithHeaders(db.get(headersColumnFamily,
"key11new".getBytes()), "11111111111", testHeaders()); // putIfAbsent with
headers
+ assertNull(db.get(headersColumnFamily, "key12new".getBytes())); //
putIfAbsent with null value on non-existing key
+ }
+
+ private void closeColumnFamilies(
+ final RocksDB db,
+ final ColumnFamilyHandle defaultColumnFamily,
+ final ColumnFamilyHandle headersColumnFamily) {
+ // Order of closing must follow: ColumnFamilyHandle > RocksDB
+ if (defaultColumnFamily != null) {
+ defaultColumnFamily.close();
+ }
+ if (headersColumnFamily != null) {
+ headersColumnFamily.close();
+ }
+ if (db != null) {
+ db.close();
+ }
+ }
+
+ private void verifyStillInUpgradeMode() {
+ // check that still in upgrade mode
+ try (LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(RocksDBMigratingSessionStoreWithHeaders.class))
{
+ rocksDBStore.init(context, rocksDBStore);
+
+ assertTrue(appender.getMessages().contains("Opening store " +
DB_NAME + " in upgrade mode"));
+ } finally {
+ rocksDBStore.close();
+ }
+ }
+
+ private void clearDefaultColumnFamily() throws Exception {
+ // clear DEFAULT CF by deleting remaining keys
+ final DBOptions dbOptions = new DBOptions();
+ final ColumnFamilyOptions columnFamilyOptions = new
ColumnFamilyOptions();
+
+ final List<ColumnFamilyDescriptor> columnFamilyDescriptors = asList(
+ new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY,
columnFamilyOptions),
+ new ColumnFamilyDescriptor(sessionStoreHeaderColumnFamilyName,
columnFamilyOptions));
+
+ final List<ColumnFamilyHandle> columnFamilies = new
ArrayList<>(columnFamilyDescriptors.size());
+ RocksDB db = null;
+ ColumnFamilyHandle defaultCF = null;
+ ColumnFamilyHandle headersCF = null;
+ try {
+ db = RocksDB.open(
+ dbOptions,
+ new File(new File(context.stateDir(), "rocksdb"),
DB_NAME).getAbsolutePath(),
+ columnFamilyDescriptors,
+ columnFamilies);
+
+ defaultCF = columnFamilies.get(0);
+ headersCF = columnFamilies.get(1);
+ db.delete(defaultCF, "key4".getBytes());
+ db.delete(defaultCF, "key7".getBytes());
+ } finally {
+ // Order of closing must follow: ColumnFamilyHandle > RocksDB >
DBOptions > ColumnFamilyOptions
+ if (defaultCF != null) {
+ defaultCF.close();
+ }
+ if (headersCF != null) {
+ headersCF.close();
+ }
+ if (db != null) {
+ db.close();
+ }
+ dbOptions.close();
+ columnFamilyOptions.close();
+ }
+ }
+
+ private void verifyInRegularMode() {
+ // check that now in regular mode (all legacy data migrated)
+ try (LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(RocksDBMigratingSessionStoreWithHeaders.class))
{
+ rocksDBStore.init(context, rocksDBStore);
+
+ assertTrue(appender.getMessages().contains("Opening store " +
DB_NAME + " in regular mode"));
+ }
+ }
+
+ private void prepareDefaultStore() {
+ // Create a plain RocksDBStore with data in default column family
+ final RocksDBStore kvStore = new RocksDBStore(DB_NAME, METRICS_SCOPE);
+ try {
+ kvStore.init(context, kvStore);
+
+ // Write plain aggregation bytes to default column family
(simulating pre-headers store data)
+ kvStore.put(new Bytes("key1".getBytes()), "1".getBytes());
+ kvStore.put(new Bytes("key2".getBytes()), "22".getBytes());
+ kvStore.put(new Bytes("key3".getBytes()), "333".getBytes());
+ kvStore.put(new Bytes("key4".getBytes()), "4444".getBytes());
+ kvStore.put(new Bytes("key5".getBytes()), "55555".getBytes());
+ kvStore.put(new Bytes("key6".getBytes()), "666666".getBytes());
+ kvStore.put(new Bytes("key7".getBytes()), "7777777".getBytes());
+ } finally {
+ kvStore.close();
+ }
+ }
+
+ private Headers testHeaders() {
+ final RecordHeaders headers = new RecordHeaders();
+ headers.add(new RecordHeader("source",
"test".getBytes(StandardCharsets.UTF_8)));
+ return headers;
+ }
+
+ private byte[] serializeAggWithHeaders(final String aggregation, final
Headers headers) {
+ return aggSerializer.serialize(null,
AggregationWithHeaders.make(aggregation, headers));
+ }
+
+ private void assertMigratedValue(final byte[] value, final String
expectedAggregation) {
+ final Headers headers =
AggregationWithHeadersDeserializer.headers(value);
+ assertFalse(headers.iterator().hasNext(), "Migrated value should have
empty headers");
+ assertArrayEquals(
+ expectedAggregation.getBytes(StandardCharsets.UTF_8),
+ AggregationWithHeadersDeserializer.rawAggregation(value),
+ "Migrated value should preserve original aggregation: " +
expectedAggregation);
+ }
+
+ private void assertValueWithHeaders(final byte[] value, final String
expectedAggregation, final Headers expectedHeaders) {
+ final AggregationWithHeaders<String> deserialized =
aggDeserializer.deserialize(null, value);
+ assertEquals(expectedAggregation, deserialized.aggregation());
+ for (final Header header : expectedHeaders) {
+ assertArrayEquals(
+ header.value(),
+ deserialized.headers().lastHeader(header.key()).value(),
+ "Expected header '" + header.key() + "' to match");
+ }
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeadersTest.java
new file mode 100644
index 00000000000..2f9c522f368
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeadersTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class RocksDBSessionStoreWithHeadersTest {
+
+ @Test
+ public void shouldThrowUnsupportedOperationOnQuery() {
+ final RocksDBSessionStoreWithHeaders store = new
RocksDBSessionStoreWithHeaders(
+ new RocksDBSegmentedBytesStore("test", "scope", 10_000L,
+ 60_000L, new SessionKeySchema()));
+ assertThrows(UnsupportedOperationException.class,
+ () -> store.query(null, null, null));
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeadersTest.java
new file mode 100644
index 00000000000..761058193f1
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionStoreWithHeadersTest.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class RocksDBTimeOrderedSessionStoreWithHeadersTest {
+
+ @Test
+ public void shouldThrowUnsupportedOperationOnQueryWithIndex() {
+ final RocksDBTimeOrderedSessionStoreWithHeaders store = new
RocksDBTimeOrderedSessionStoreWithHeaders(
+ new RocksDBTimeOrderedSessionSegmentedBytesStore("test", "scope",
+ 10_000L, 60_000L, true));
+ assertThrows(UnsupportedOperationException.class,
+ () -> store.query(null, null, null));
+ }
+
+ @Test
+ public void shouldThrowUnsupportedOperationOnQueryWithOutIndex() {
+ final RocksDBTimeOrderedSessionStoreWithHeaders store = new
RocksDBTimeOrderedSessionStoreWithHeaders(
+ new RocksDBTimeOrderedSessionSegmentedBytesStore("test", "scope",
+ 10_000L, 60_000L, false));
+ assertThrows(UnsupportedOperationException.class,
+ () -> store.query(null, null, null));
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionSegmentWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionSegmentWithHeadersTest.java
new file mode 100644
index 00000000000..1aeba768601
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionSegmentWithHeadersTest.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.internals.metrics.RocksDBMetricsRecorder;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.io.File;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static
org.apache.kafka.streams.StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class SessionSegmentWithHeadersTest {
+
+ private final RocksDBMetricsRecorder metricsRecorder =
+ new RocksDBMetricsRecorder("metrics-scope", "store-name");
+
+ @BeforeEach
+ public void setUp() {
+ metricsRecorder.init(
+ new StreamsMetricsImpl(new Metrics(), "test-client", new
MockTime()),
+ new TaskId(0, 0)
+ );
+ }
+
+ @Test
+ public void shouldDeleteStateDirectoryOnDestroy() throws Exception {
+ final SessionSegmentWithHeaders segment =
+ new SessionSegmentWithHeaders("segment", "window", 0L,
Position.emptyPosition(), metricsRecorder);
+ final String directoryPath =
TestUtils.tempDirectory().getAbsolutePath();
+ final File directory = new File(directoryPath);
+
+ segment.openDB(mkMap(mkEntry(METRICS_RECORDING_LEVEL_CONFIG, "INFO")),
directory);
+
+ assertTrue(new File(directoryPath, "window").exists());
+ assertTrue(new File(directoryPath + File.separator + "window",
"segment").exists());
+ assertTrue(new File(directoryPath + File.separator + "window",
"segment").list().length > 0);
+ segment.destroy();
+ assertFalse(new File(directoryPath + File.separator + "window",
"segment").exists());
+ assertTrue(new File(directoryPath, "window").exists());
+
+ segment.close();
+ }
+
+ @Test
+ public void shouldBeEqualIfIdIsEqual() {
+ final SessionSegmentWithHeaders segment =
+ new SessionSegmentWithHeaders("anyName", "anyName", 0L,
Position.emptyPosition(), metricsRecorder);
+ final SessionSegmentWithHeaders segmentSameId =
+ new SessionSegmentWithHeaders("someOtherName", "someOtherName",
0L, Position.emptyPosition(), metricsRecorder);
+ final SessionSegmentWithHeaders segmentDifferentId =
+ new SessionSegmentWithHeaders("anyName", "anyName", 1L,
Position.emptyPosition(), metricsRecorder);
+
+ assertEquals(segment, segment);
+ assertEquals(segment, segmentSameId);
+ assertNotEquals(segment, segmentDifferentId);
+ assertNotEquals(segment, null);
+ assertNotEquals(segment, "anyName");
+
+ segment.close();
+ segmentSameId.close();
+ segmentDifferentId.close();
+ }
+
+ @Test
+ public void shouldHashOnSegmentIdOnly() {
+ final SessionSegmentWithHeaders segment =
+ new SessionSegmentWithHeaders("anyName", "anyName", 0L,
Position.emptyPosition(), metricsRecorder);
+ final SessionSegmentWithHeaders segmentSameId =
+ new SessionSegmentWithHeaders("someOtherName", "someOtherName",
0L, Position.emptyPosition(), metricsRecorder);
+ final SessionSegmentWithHeaders segmentDifferentId =
+ new SessionSegmentWithHeaders("anyName", "anyName", 1L,
Position.emptyPosition(), metricsRecorder);
+
+ final Set<SessionSegmentWithHeaders> set = new HashSet<>();
+ assertTrue(set.add(segment));
+ assertFalse(set.add(segmentSameId));
+ assertTrue(set.add(segmentDifferentId));
+
+ segment.close();
+ segmentSameId.close();
+ segmentDifferentId.close();
+ }
+
+ @Test
+ public void shouldCompareSegmentIdOnly() {
+ final SessionSegmentWithHeaders segment1 =
+ new SessionSegmentWithHeaders("a", "C", 50L,
Position.emptyPosition(), metricsRecorder);
+ final SessionSegmentWithHeaders segment2 =
+ new SessionSegmentWithHeaders("b", "B", 100L,
Position.emptyPosition(), metricsRecorder);
+ final SessionSegmentWithHeaders segment3 =
+ new SessionSegmentWithHeaders("c", "A", 0L,
Position.emptyPosition(), metricsRecorder);
+
+ assertEquals(0, segment1.compareTo(segment1));
+ assertEquals(-1, segment1.compareTo(segment2));
+ assertEquals(1, segment2.compareTo(segment1));
+ assertEquals(1, segment1.compareTo(segment3));
+ assertEquals(-1, segment3.compareTo(segment1));
+ assertEquals(1, segment2.compareTo(segment3));
+ assertEquals(-1, segment3.compareTo(segment2));
+
+ segment1.close();
+ segment2.close();
+ segment3.close();
+ }
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeadersTest.java
new file mode 100644
index 00000000000..eea7d60a16f
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/SessionSegmentsWithHeadersTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class SessionSegmentsWithHeadersTest {
+
+ private static final long SEGMENT_INTERVAL = 100L;
+ private static final long RETENTION_PERIOD = 4 * SEGMENT_INTERVAL;
+ private static final String METRICS_SCOPE = "test-state-id";
+ private InternalMockProcessorContext context;
+ private SessionSegmentsWithHeaders segments;
+ private File stateDirectory;
+ private final String storeName = "test";
+
+ @BeforeEach
+ public void createContext() {
+ stateDirectory = TestUtils.tempDirectory();
+ context = new InternalMockProcessorContext<>(
+ stateDirectory,
+ Serdes.String(),
+ Serdes.Long(),
+ new MockRecordCollector(),
+ new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics()))
+ );
+ segments = new SessionSegmentsWithHeaders(storeName, METRICS_SCOPE,
RETENTION_PERIOD, SEGMENT_INTERVAL);
+ segments.openExisting(context, -1L);
+ }
+
+ @AfterEach
+ public void close() {
+ segments.close();
+ }
+
+ @Test
+ public void shouldGetSegmentIdsFromTimestamp() {
+ assertEquals(0, segments.segmentId(0));
+ assertEquals(1, segments.segmentId(SEGMENT_INTERVAL));
+ assertEquals(2, segments.segmentId(2 * SEGMENT_INTERVAL));
+ assertEquals(3, segments.segmentId(3 * SEGMENT_INTERVAL));
+ }
+
+ @Test
+ public void shouldGetSegmentNameFromId() {
+ assertEquals("test.0", segments.segmentName(0));
+ assertEquals("test." + SEGMENT_INTERVAL, segments.segmentName(1));
+ assertEquals("test." + 2 * SEGMENT_INTERVAL, segments.segmentName(2));
+ }
+
+ @Test
+ public void shouldCreateSegments() {
+ final SessionSegmentWithHeaders segment1 =
segments.getOrCreateSegmentIfLive(0, context, -1L);
+ final SessionSegmentWithHeaders segment2 =
segments.getOrCreateSegmentIfLive(1, context, -1L);
+ final SessionSegmentWithHeaders segment3 =
segments.getOrCreateSegmentIfLive(2, context, -1L);
+
+ assertTrue(new File(context.stateDir(), "test/test.0").isDirectory());
+ assertTrue(new File(context.stateDir(), "test/test." +
SEGMENT_INTERVAL).isDirectory());
+ assertTrue(new File(context.stateDir(), "test/test." + 2 *
SEGMENT_INTERVAL).isDirectory());
+ assertTrue(segment1.isOpen());
+ assertTrue(segment2.isOpen());
+ assertTrue(segment3.isOpen());
+ }
+
+ @Test
+ public void shouldNotCreateSegmentThatIsAlreadyExpired() {
+ final long streamTime = updateStreamTimeAndCreateSegment(7);
+ assertNull(segments.getOrCreateSegmentIfLive(0, context, streamTime));
+ assertFalse(new File(context.stateDir(), "test/test.0").exists());
+ }
+
+ @Test
+ public void shouldCleanupSegmentsThatHaveExpired() {
+ final SessionSegmentWithHeaders segment1 =
segments.getOrCreateSegmentIfLive(0, context, -1L);
+ final SessionSegmentWithHeaders segment2 =
segments.getOrCreateSegmentIfLive(1, context, -1L);
+ final SessionSegmentWithHeaders segment3 =
segments.getOrCreateSegmentIfLive(7, context, SEGMENT_INTERVAL * 7L);
+
+ assertFalse(segment1.isOpen());
+ assertFalse(segment2.isOpen());
+ assertTrue(segment3.isOpen());
+ assertFalse(new File(context.stateDir(), "test/test.0").exists());
+ assertFalse(new File(context.stateDir(), "test/test." +
SEGMENT_INTERVAL).exists());
+ assertTrue(new File(context.stateDir(), "test/test." + 7 *
SEGMENT_INTERVAL).exists());
+ }
+
+ @Test
+ public void shouldGetSegmentForTimestamp() {
+ final SessionSegmentWithHeaders segment =
segments.getOrCreateSegmentIfLive(0, context, -1L);
+ segments.getOrCreateSegmentIfLive(1, context, -1L);
+ assertEquals(segment, segments.segmentForTimestamp(0L));
+ }
+
+ @Test
+ public void shouldGetCorrectSegmentString() {
+ final SessionSegmentWithHeaders segment =
segments.getOrCreateSegmentIfLive(0, context, -1L);
+ assertEquals("SessionSegmentWithHeaders(id=0, name=test.0)",
segment.toString());
+ }
+
+ @Test
+ public void shouldCloseAllOpenSegments() {
+ final SessionSegmentWithHeaders first =
segments.getOrCreateSegmentIfLive(0, context, -1L);
+ final SessionSegmentWithHeaders second =
segments.getOrCreateSegmentIfLive(1, context, -1L);
+ final SessionSegmentWithHeaders third =
segments.getOrCreateSegmentIfLive(2, context, -1L);
+ segments.close();
+
+ assertFalse(first.isOpen());
+ assertFalse(second.isOpen());
+ assertFalse(third.isOpen());
+ }
+
+ @Test
+ public void shouldOpenExistingSegments() {
+ segments = new SessionSegmentsWithHeaders("test", METRICS_SCOPE, 4, 1);
+ segments.openExisting(context, -1L);
+ segments.getOrCreateSegmentIfLive(0, context, -1L);
+ segments.getOrCreateSegmentIfLive(1, context, -1L);
+ segments.getOrCreateSegmentIfLive(2, context, -1L);
+ segments.getOrCreateSegmentIfLive(3, context, -1L);
+ segments.getOrCreateSegmentIfLive(4, context, -1L);
+ // close existing.
+ segments.close();
+
+ segments = new SessionSegmentsWithHeaders("test", METRICS_SCOPE, 4, 1);
+ segments.openExisting(context, -1L);
+
+ assertTrue(segments.segmentForTimestamp(0).isOpen());
+ assertTrue(segments.segmentForTimestamp(1).isOpen());
+ assertTrue(segments.segmentForTimestamp(2).isOpen());
+ assertTrue(segments.segmentForTimestamp(3).isOpen());
+ assertTrue(segments.segmentForTimestamp(4).isOpen());
+ }
+
+ @Test
+ public void shouldGetSegmentsWithinTimeRange() {
+ updateStreamTimeAndCreateSegment(0);
+ updateStreamTimeAndCreateSegment(1);
+ updateStreamTimeAndCreateSegment(2);
+ updateStreamTimeAndCreateSegment(3);
+ final long streamTime = updateStreamTimeAndCreateSegment(4);
+ segments.getOrCreateSegmentIfLive(0, context, streamTime);
+ segments.getOrCreateSegmentIfLive(1, context, streamTime);
+ segments.getOrCreateSegmentIfLive(2, context, streamTime);
+ segments.getOrCreateSegmentIfLive(3, context, streamTime);
+ segments.getOrCreateSegmentIfLive(4, context, streamTime);
+
+ final List<SessionSegmentWithHeaders> segments =
this.segments.segments(0, 2 * SEGMENT_INTERVAL, true);
+ assertEquals(3, segments.size());
+ assertEquals(0, segments.get(0).id);
+ assertEquals(1, segments.get(1).id);
+ assertEquals(2, segments.get(2).id);
+ }
+
+ @Test
+ public void shouldClearSegmentsOnClose() {
+ segments.getOrCreateSegmentIfLive(0, context, -1L);
+ segments.close();
+ assertNull(segments.segmentForTimestamp(0));
+ }
+
+ private long updateStreamTimeAndCreateSegment(final int segment) {
+ final long streamTime = SEGMENT_INTERVAL * segment;
+ segments.getOrCreateSegmentIfLive(segment, context, streamTime);
+ return streamTime;
+ }
+}