This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2d68b60c16e KAFKA-20134: Implement TimestampedWindowStoreWithHeaders 
(2/N) (#21478)
2d68b60c16e is described below

commit 2d68b60c16e0a4743af9ea603c1d371d2a9dbc10
Author: TengYao Chi <[email protected]>
AuthorDate: Fri Feb 20 10:58:06 2026 +0000

    KAFKA-20134: Implement TimestampedWindowStoreWithHeaders (2/N) (#21478)
    
    This PR add `RocksDBTimestampedWindowStoreWithHeaders` for the
    `TimestampedWindowStoreWithHeaders` introduced in KIP-1271.
    
    Reviewers: lieh Saeedi <[email protected]>, Bill Bejeck
    <[email protected]>
---
 .../RocksDBTimestampedWindowStoreWithHeaders.java  | 50 ++++++++++++++++++++++
 .../internals/RocksDbWindowBytesStoreSupplier.java | 13 +++++-
 2 files changed, 62 insertions(+), 1 deletion(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedWindowStoreWithHeaders.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedWindowStoreWithHeaders.java
new file mode 100644
index 00000000000..5088a1ff207
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimestampedWindowStoreWithHeaders.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.state.HeadersBytesStore;
+import org.apache.kafka.streams.state.TimestampedBytesStore;
+
+/**
+ * RocksDB-backed timestamped window store with support for record headers.
+ * <p>
+ * This store extends {@link RocksDBWindowStore} and implements both
+ * {@link TimestampedBytesStore} (for timestamp support) and {@link 
HeadersBytesStore}
+ * (for header support) marker interfaces.
+ * <p>
+ * The storage format for values is: 
[headersSize(varint)][headersBytes][timestamp(8)][value]
+ * <p>
+ * This implementation uses segment-level versioning for backward 
compatibility:
+ * <ul>
+ * <li>Old segments continue to use the legacy format without headers</li>
+ * <li>New segments use the header-embedded format</li>
+ * <li>Legacy values are served with empty headers on read</li>
+ * <li>All new writes use the new format</li>
+ * </ul>
+ *
+ * @see RocksDBWindowStore
+ * @see HeadersBytesStore
+ * @see TimestampedBytesStore
+ */
+class RocksDBTimestampedWindowStoreWithHeaders extends RocksDBWindowStore 
implements TimestampedBytesStore, HeadersBytesStore {
+
+    RocksDBTimestampedWindowStoreWithHeaders(final SegmentedBytesStore 
bytesStore,
+                                             final boolean retainDuplicates,
+                                             final long windowSize) {
+        super(bytesStore, retainDuplicates, windowSize);
+    }
+}
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
index 3ee5b88328b..14213b659e5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDbWindowBytesStoreSupplier.java
@@ -23,7 +23,8 @@ import org.apache.kafka.streams.state.WindowStore;
 public class RocksDbWindowBytesStoreSupplier implements 
WindowBytesStoreSupplier {
     public enum WindowStoreTypes {
         DEFAULT_WINDOW_STORE,
-        TIMESTAMPED_WINDOW_STORE
+        TIMESTAMPED_WINDOW_STORE,
+        TIMESTAMPED_WINDOW_STORE_WITH_HEADERS
     }
 
     private final String name;
@@ -87,6 +88,16 @@ public class RocksDbWindowBytesStoreSupplier implements 
WindowBytesStoreSupplier
                         new WindowKeySchema()),
                     retainDuplicates,
                     windowSize);
+            case TIMESTAMPED_WINDOW_STORE_WITH_HEADERS:
+                return new RocksDBTimestampedWindowStoreWithHeaders(
+                    new RocksDBTimestampedSegmentedBytesStoreWithHeaders(
+                        name,
+                        metricsScope(),
+                        retentionPeriod,
+                        segmentInterval,
+                        new WindowKeySchema()),
+                    retainDuplicates,
+                    windowSize);
             default:
                 throw new IllegalArgumentException("invalid window store type: 
" + windowStoreType);
         }

Reply via email to