mjsax commented on code in PR #21794:
URL: https://github.com/apache/kafka/pull/21794#discussion_r2957530280


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java:
##########
@@ -115,32 +125,64 @@ public KeyValue<Bytes, byte[]> next() {
         protected abstract Bytes getBaseKey(final Bytes indexKey);
     }
 
-    /**
-     * Concrete implementation of IndexToBaseStoreIterator for window key 
schema.
-     * Converts index keys (key-first schema) to base store keys (time-first 
schema).
-     * <p>
-     * This can be reused by both window store implementations (with and 
without headers).
-     */
-    class WindowKeySchemaIndexToBaseStoreIterator extends 
IndexToBaseStoreIterator {
-        WindowKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes, 
byte[]> indexIterator) {
-            super(indexIterator);
-        }
-
-        @Override
-        protected Bytes getBaseKey(final Bytes indexKey) {
-            final byte[] keyBytes = 
KeyFirstWindowKeySchema.extractStoreKeyBytes(indexKey.get());
-            final long timestamp = 
KeyFirstWindowKeySchema.extractStoreTimestamp(indexKey.get());
-            final int seqnum = 
KeyFirstWindowKeySchema.extractStoreSequence(indexKey.get());
-            return TimeFirstWindowKeySchema.toStoreKeyBinary(keyBytes, 
timestamp, seqnum);
-        }
-    }
-
     AbstractRocksDBTimeOrderedSegmentedBytesStore(final String name,
                                                   final long retention,
                                                   final KeySchema 
baseKeySchema,
                                                   final Optional<KeySchema> 
indexKeySchema,
                                                   final AbstractSegments<S> 
segments) {
         super(name, baseKeySchema, indexKeySchema, segments, retention);
+
+        minTimestamp = Long.MAX_VALUE;
+    }
+
+    Map<S, WriteBatch> getWriteBatches(

Review Comment:
   This is a refactoring to allow sharing more code. The different sub-classes 
of `AbstractRocksDBTimeOrderedSegmentedBytesStore` all share a 99% common impl 
of `Map<S, WriteBatch> getWriteBatches(Collection<ConsumerRecord<byte[], 
byte[]>> records)`, so I add this shared impl which is "customized" this the 
corresponding extractors.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java:
##########
@@ -115,32 +125,64 @@ public KeyValue<Bytes, byte[]> next() {
         protected abstract Bytes getBaseKey(final Bytes indexKey);
     }
 
-    /**
-     * Concrete implementation of IndexToBaseStoreIterator for window key 
schema.
-     * Converts index keys (key-first schema) to base store keys (time-first 
schema).
-     * <p>
-     * This can be reused by both window store implementations (with and 
without headers).
-     */
-    class WindowKeySchemaIndexToBaseStoreIterator extends 
IndexToBaseStoreIterator {
-        WindowKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes, 
byte[]> indexIterator) {
-            super(indexIterator);
-        }
-
-        @Override
-        protected Bytes getBaseKey(final Bytes indexKey) {
-            final byte[] keyBytes = 
KeyFirstWindowKeySchema.extractStoreKeyBytes(indexKey.get());
-            final long timestamp = 
KeyFirstWindowKeySchema.extractStoreTimestamp(indexKey.get());
-            final int seqnum = 
KeyFirstWindowKeySchema.extractStoreSequence(indexKey.get());
-            return TimeFirstWindowKeySchema.toStoreKeyBinary(keyBytes, 
timestamp, seqnum);
-        }
-    }
-
     AbstractRocksDBTimeOrderedSegmentedBytesStore(final String name,
                                                   final long retention,
                                                   final KeySchema 
baseKeySchema,
                                                   final Optional<KeySchema> 
indexKeySchema,
                                                   final AbstractSegments<S> 
segments) {
         super(name, baseKeySchema, indexKeySchema, segments, retention);
+
+        minTimestamp = Long.MAX_VALUE;
+    }
+
+    Map<S, WriteBatch> getWriteBatches(
+        final Collection<ConsumerRecord<byte[], byte[]>> records,
+        final Function<byte[], Long> timestampExtractor,
+        final Function<byte[], byte[]> indexedKeyExtractor,
+        final Function<byte[], byte[]> baseKeyExtractor
+    ) {
+        // advance stream time to the max timestamp in the batch
+        for (final ConsumerRecord<byte[], byte[]> record : records) {
+            final long timestamp = timestampExtractor.apply(record.key()); 
//SessionKeySchema.extractEndTimestamp(record.key());
+            minTimestamp = Math.min(minTimestamp, timestamp);

Review Comment:
   This computation is `RocksDBTimeOrderedKeyValueBytesStore` specific, but 
it's cheap enough to just do it for all sub-classes now.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java:
##########
@@ -115,32 +125,64 @@ public KeyValue<Bytes, byte[]> next() {
         protected abstract Bytes getBaseKey(final Bytes indexKey);
     }
 
-    /**
-     * Concrete implementation of IndexToBaseStoreIterator for window key 
schema.
-     * Converts index keys (key-first schema) to base store keys (time-first 
schema).
-     * <p>
-     * This can be reused by both window store implementations (with and 
without headers).
-     */
-    class WindowKeySchemaIndexToBaseStoreIterator extends 
IndexToBaseStoreIterator {
-        WindowKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes, 
byte[]> indexIterator) {
-            super(indexIterator);
-        }
-
-        @Override
-        protected Bytes getBaseKey(final Bytes indexKey) {
-            final byte[] keyBytes = 
KeyFirstWindowKeySchema.extractStoreKeyBytes(indexKey.get());
-            final long timestamp = 
KeyFirstWindowKeySchema.extractStoreTimestamp(indexKey.get());
-            final int seqnum = 
KeyFirstWindowKeySchema.extractStoreSequence(indexKey.get());
-            return TimeFirstWindowKeySchema.toStoreKeyBinary(keyBytes, 
timestamp, seqnum);
-        }
-    }
-
     AbstractRocksDBTimeOrderedSegmentedBytesStore(final String name,
                                                   final long retention,
                                                   final KeySchema 
baseKeySchema,
                                                   final Optional<KeySchema> 
indexKeySchema,
                                                   final AbstractSegments<S> 
segments) {
         super(name, baseKeySchema, indexKeySchema, segments, retention);
+
+        minTimestamp = Long.MAX_VALUE;
+    }
+
+    Map<S, WriteBatch> getWriteBatches(
+        final Collection<ConsumerRecord<byte[], byte[]>> records,
+        final Function<byte[], Long> timestampExtractor,
+        final Function<byte[], byte[]> indexedKeyExtractor,
+        final Function<byte[], byte[]> baseKeyExtractor
+    ) {
+        // advance stream time to the max timestamp in the batch
+        for (final ConsumerRecord<byte[], byte[]> record : records) {
+            final long timestamp = timestampExtractor.apply(record.key()); 
//SessionKeySchema.extractEndTimestamp(record.key());
+            minTimestamp = Math.min(minTimestamp, timestamp);
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+        }
+
+        final Map<S, WriteBatch> writeBatchMap = new HashMap<>();
+        for (final ConsumerRecord<byte[], byte[]> record : records) {
+            final long timestamp = timestampExtractor.apply(record.key()); 
//SessionKeySchema.extractEndTimestamp(record.key());
+            final long segmentId = segments.segmentId(timestamp);
+            final S segment = segments.getOrCreateSegmentIfLive(segmentId, 
internalProcessorContext, observedStreamTime);
+            if (segment != null) {
+                
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
+                    record,
+                    consistencyEnabled,
+                    position
+                );
+                try {
+                    final WriteBatch batch = 
writeBatchMap.computeIfAbsent(segment, s -> new WriteBatch());
+
+                    // Assuming changelog record is serialized using 
SessionKeySchema
+                    // from ChangeLoggingSessionBytesStore. Reconstruct 
key/value to restore
+                    if (hasIndex()) {

Review Comment:
   This part is only needed for "indexed stores" and thus does not apply to 
`RocksDBTimeOrderedKeyValueBytesStore` -- again, it does really add overhead to 
`RocksDBTimeOrderedKeyValueBytesStore` so ok IMHO.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java:
##########
@@ -184,7 +184,7 @@ public void commit(final Map<TopicPartition, Long> 
changelogOffsets) {
         }
     }
 
-    @SuppressWarnings("deprecation")
+    @Deprecated

Review Comment:
   KIP-1035 side cleanup



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java:
##########
@@ -59,9 +67,11 @@
  * @see RocksDBTimeOrderedWindowSegmentedBytesStore
  */
 public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore<S extends 
Segment> extends AbstractDualSchemaRocksDBSegmentedBytesStore<S> {
-    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractDualSchemaRocksDBSegmentedBytesStore.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractRocksDBTimeOrderedSegmentedBytesStore.class);
 
-    abstract class IndexToBaseStoreIterator implements KeyValueIterator<Bytes, 
byte[]> {
+    private long minTimestamp;
+
+    public abstract class IndexToBaseStoreIterator implements 
KeyValueIterator<Bytes, byte[]> {

Review Comment:
   Another side cleanup -- we use many of these classes outside of the 
`internals` package, so we should just declare them `public`.
   
   There is more changes like this on this PR -- if we think we don't want this 
change, we can also revert it.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+
+/**
+ * A RocksDB-backed time-ordered segmented bytes store with headers support 
for session key schema.
+ * <p>
+ * This store extends {@link AbstractRocksDBTimeOrderedSegmentedBytesStore} 
and uses
+ * {@link SessionSegmentsWithHeaders} to manage segments with full header 
support,
+ * including Column Family management and lazy migration from legacy formats.
+ * <p>
+ * The store maintains a dual-schema architecture:
+ * <ul>
+ *   <li>Base store: Time-first session key schema for efficient time-range 
queries</li>
+ *   <li>Index store (optional): Key-first session key schema for efficient 
key-based queries</li>
+ * </ul>
+ * <p>
+ * Headers are managed at the segment level by {@link 
SessionSegmentWithHeaders}.
+ * <p>
+ * Value format (timestamps are in the key, not in the value):
+ * <ul>
+ *   <li>Old format: {@code [aggregationBytes]}</li>
+ *   <li>New format: {@code 
[headersSize(varint)][headersBytes][aggregationBytes]}</li>
+ * </ul>
+ *
+ * @see RocksDBTimeOrderedSessionStore
+ * @see SessionSegmentsWithHeaders
+ * @see SessionSegmentWithHeaders
+ */
+class RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders

Review Comment:
   Added the new class we need is not trivial with the unified code on the 
super-classes :) 



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java:
##########
@@ -59,9 +67,11 @@
  * @see RocksDBTimeOrderedWindowSegmentedBytesStore
  */
 public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore<S extends 
Segment> extends AbstractDualSchemaRocksDBSegmentedBytesStore<S> {
-    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractDualSchemaRocksDBSegmentedBytesStore.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractRocksDBTimeOrderedSegmentedBytesStore.class);
 
-    abstract class IndexToBaseStoreIterator implements KeyValueIterator<Bytes, 
byte[]> {
+    private long minTimestamp;

Review Comment:
   I unified more code (cf other comments) -- this is move from 
`RocksDBTimeOrderedKeyValueBytesStore` to here



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java:
##########
@@ -115,32 +125,64 @@ public KeyValue<Bytes, byte[]> next() {
         protected abstract Bytes getBaseKey(final Bytes indexKey);
     }
 
-    /**
-     * Concrete implementation of IndexToBaseStoreIterator for window key 
schema.
-     * Converts index keys (key-first schema) to base store keys (time-first 
schema).
-     * <p>
-     * This can be reused by both window store implementations (with and 
without headers).
-     */
-    class WindowKeySchemaIndexToBaseStoreIterator extends 
IndexToBaseStoreIterator {
-        WindowKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes, 
byte[]> indexIterator) {
-            super(indexIterator);
-        }
-
-        @Override
-        protected Bytes getBaseKey(final Bytes indexKey) {
-            final byte[] keyBytes = 
KeyFirstWindowKeySchema.extractStoreKeyBytes(indexKey.get());
-            final long timestamp = 
KeyFirstWindowKeySchema.extractStoreTimestamp(indexKey.get());
-            final int seqnum = 
KeyFirstWindowKeySchema.extractStoreSequence(indexKey.get());
-            return TimeFirstWindowKeySchema.toStoreKeyBinary(keyBytes, 
timestamp, seqnum);
-        }
-    }
-
     AbstractRocksDBTimeOrderedSegmentedBytesStore(final String name,
                                                   final long retention,
                                                   final KeySchema 
baseKeySchema,
                                                   final Optional<KeySchema> 
indexKeySchema,
                                                   final AbstractSegments<S> 
segments) {
         super(name, baseKeySchema, indexKeySchema, segments, retention);
+
+        minTimestamp = Long.MAX_VALUE;
+    }
+
+    Map<S, WriteBatch> getWriteBatches(
+        final Collection<ConsumerRecord<byte[], byte[]>> records,
+        final Function<byte[], Long> timestampExtractor,
+        final Function<byte[], byte[]> indexedKeyExtractor,
+        final Function<byte[], byte[]> baseKeyExtractor
+    ) {
+        // advance stream time to the max timestamp in the batch
+        for (final ConsumerRecord<byte[], byte[]> record : records) {
+            final long timestamp = timestampExtractor.apply(record.key()); 
//SessionKeySchema.extractEndTimestamp(record.key());
+            minTimestamp = Math.min(minTimestamp, timestamp);
+            observedStreamTime = Math.max(observedStreamTime, timestamp);
+        }

Review Comment:
   This loop was not in `RocksDBTimeOrderedKeyValueBytesStore`, but I believe 
it was actually a small bug there... the original code in 
`RocksDBTimeOrderedKeyValueBytesStore` did compute `observedStreamTime` not 
upfront, but in the main loop, what seem incorrect?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java:
##########
@@ -115,32 +125,64 @@ public KeyValue<Bytes, byte[]> next() {
         protected abstract Bytes getBaseKey(final Bytes indexKey);
     }
 
-    /**
-     * Concrete implementation of IndexToBaseStoreIterator for window key 
schema.
-     * Converts index keys (key-first schema) to base store keys (time-first 
schema).
-     * <p>
-     * This can be reused by both window store implementations (with and 
without headers).
-     */
-    class WindowKeySchemaIndexToBaseStoreIterator extends 
IndexToBaseStoreIterator {

Review Comment:
   This class belongs to `RocksDBTimeOrderedWindowSegmentedBytesStore` -- moved 
it there



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -156,7 +156,7 @@ public synchronized void close() {
             iterators = new HashSet<>(openIterators);
             openIterators.clear();
         }
-        if (iterators.size() != 0) {
+        if (!iterators.isEmpty()) {

Review Comment:
   side cleanup



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java:
##########
@@ -59,9 +67,11 @@
  * @see RocksDBTimeOrderedWindowSegmentedBytesStore
  */
 public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore<S extends 
Segment> extends AbstractDualSchemaRocksDBSegmentedBytesStore<S> {
-    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractDualSchemaRocksDBSegmentedBytesStore.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(AbstractRocksDBTimeOrderedSegmentedBytesStore.class);

Review Comment:
   Side fix



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java:
##########
@@ -86,8 +86,8 @@ LogicalKeyValueSegment createReservedSegment(final long 
segmentId,
     }
 
     // VisibleForTesting
-    LogicalKeyValueSegment getReservedSegment(final long segmentId) {
-        return reservedSegments.get(segmentId);
+    LogicalKeyValueSegment getReservedSegment() {
+        return reservedSegments.get(-1L);

Review Comment:
   Side cleanup -- we always pas `-1L`.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeaders.java:
##########
@@ -27,7 +28,7 @@
  * RocksDB-backed session store with support for record headers.
  * <p>
  * This store extends {@link RocksDBSessionStore} and returns
- * {@link QueryResult#forUnknownQueryType(Query, Object)} for all queries,
+ * {@link QueryResult#forUnknownQueryType(Query, StateStore)} for all queries,

Review Comment:
   Side fix (also elsewhere)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java:
##########
@@ -115,9 +115,6 @@ private boolean isTimeOrderedStore(final StateStore 
stateStore) {
         if (stateStore instanceof RocksDBTimeOrderedWindowStore) {
             return true;
         }
-        if (stateStore instanceof RocksDBTimeOrderedWindowStoreWithHeaders) {

Review Comment:
   `RocksDBTimeOrderedWindowStoreWithHeaders` extends 
`RocksDBTimeOrderedWindowStore` so this second check is unnecessary.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java:
##########
@@ -35,8 +35,8 @@
 import java.util.Objects;
 
 
-public class RocksDBTimeOrderedWindowStore
-    extends WrappedStateStore<AbstractRocksDBTimeOrderedSegmentedBytesStore<? 
extends Segment>, Object, Object>
+public class RocksDBTimeOrderedWindowStore<S extends Segment>

Review Comment:
   This the the follow up from https://github.com/apache/kafka/pull/21780



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to