This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2d68b60c16e KAFKA-20134: Implement TimestampedWindowStoreWithHeaders
(2/N) (#21478)
2d68b60c16e is described below
commit 2d68b60c16e0a4743af9ea603c1d371d2a9dbc10
Author: TengYao Chi <[email protected]>
AuthorDate: Fri Feb 20 10:58:06 2026 +0000
KAFKA-20134: Implement TimestampedWindowStoreWithHeaders (2/N) (#21478)
This PR add `RocksDBTimestampedWindowStoreWithHeaders` for the
`TimestampedWindowStoreWithHeaders` introduced in KIP-1271.
Reviewers: lieh Saeedi <[email protected]>, Bill Bejeck
<[email protected]>
---
.../RocksDBTimestampedWindowStoreWithHeaders.java | 50 ++++++++++++++++++++++
.../internals/RocksDbWindowBytesStoreSupplier.java | 13 +++++-
2 files changed, 62 insertions(+), 1 deletion(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedWindowStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedWindowStoreWithHeaders.java
new file mode 100644
index 00000000000..5088a1ff207
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedWindowStoreWithHeaders.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.state.HeadersBytesStore;
+import org.apache.kafka.streams.state.TimestampedBytesStore;
+
+/**
+ * RocksDB-backed timestamped window store with support for record headers.
+ * <p>
+ * This store extends {@link RocksDBWindowStore} and implements both
+ * {@link TimestampedBytesStore} (for timestamp support) and {@link
HeadersBytesStore}
+ * (for header support) marker interfaces.
+ * <p>
+ * The storage format for values is:
[headersSize(varint)][headersBytes][timestamp(8)][value]
+ * <p>
+ * This implementation uses segment-level versioning for backward
compatibility:
+ * <ul>
+ * <li>Old segments continue to use the legacy format without headers</li>
+ * <li>New segments use the header-embedded format</li>
+ * <li>Legacy values are served with empty headers on read</li>
+ * <li>All new writes use the new format</li>
+ * </ul>
+ *
+ * @see RocksDBWindowStore
+ * @see HeadersBytesStore
+ * @see TimestampedBytesStore
+ */
+class RocksDBTimestampedWindowStoreWithHeaders extends RocksDBWindowStore
implements TimestampedBytesStore, HeadersBytesStore {
+
+ RocksDBTimestampedWindowStoreWithHeaders(final SegmentedBytesStore
bytesStore,
+ final boolean retainDuplicates,
+ final long windowSize) {
+ super(bytesStore, retainDuplicates, windowSize);
+ }
+}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
index 3ee5b88328b..14213b659e5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
@@ -23,7 +23,8 @@ import org.apache.kafka.streams.state.WindowStore;
public class RocksDbWindowBytesStoreSupplier implements
WindowBytesStoreSupplier {
public enum WindowStoreTypes {
DEFAULT_WINDOW_STORE,
- TIMESTAMPED_WINDOW_STORE
+ TIMESTAMPED_WINDOW_STORE,
+ TIMESTAMPED_WINDOW_STORE_WITH_HEADERS
}
private final String name;
@@ -87,6 +88,16 @@ public class RocksDbWindowBytesStoreSupplier implements
WindowBytesStoreSupplier
new WindowKeySchema()),
retainDuplicates,
windowSize);
+ case TIMESTAMPED_WINDOW_STORE_WITH_HEADERS:
+ return new RocksDBTimestampedWindowStoreWithHeaders(
+ new RocksDBTimestampedSegmentedBytesStoreWithHeaders(
+ name,
+ metricsScope(),
+ retentionPeriod,
+ segmentInterval,
+ new WindowKeySchema()),
+ retainDuplicates,
+ windowSize);
default:
throw new IllegalArgumentException("invalid window store type:
" + windowStoreType);
}