mjsax commented on code in PR #21794:
URL: https://github.com/apache/kafka/pull/21794#discussion_r2957530280
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java:
##########
@@ -115,32 +125,64 @@ public KeyValue<Bytes, byte[]> next() {
protected abstract Bytes getBaseKey(final Bytes indexKey);
}
- /**
- * Concrete implementation of IndexToBaseStoreIterator for window key
schema.
- * Converts index keys (key-first schema) to base store keys (time-first
schema).
- * <p>
- * This can be reused by both window store implementations (with and
without headers).
- */
- class WindowKeySchemaIndexToBaseStoreIterator extends
IndexToBaseStoreIterator {
- WindowKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes,
byte[]> indexIterator) {
- super(indexIterator);
- }
-
- @Override
- protected Bytes getBaseKey(final Bytes indexKey) {
- final byte[] keyBytes =
KeyFirstWindowKeySchema.extractStoreKeyBytes(indexKey.get());
- final long timestamp =
KeyFirstWindowKeySchema.extractStoreTimestamp(indexKey.get());
- final int seqnum =
KeyFirstWindowKeySchema.extractStoreSequence(indexKey.get());
- return TimeFirstWindowKeySchema.toStoreKeyBinary(keyBytes,
timestamp, seqnum);
- }
- }
-
AbstractRocksDBTimeOrderedSegmentedBytesStore(final String name,
final long retention,
final KeySchema
baseKeySchema,
final Optional<KeySchema>
indexKeySchema,
final AbstractSegments<S>
segments) {
super(name, baseKeySchema, indexKeySchema, segments, retention);
+
+ minTimestamp = Long.MAX_VALUE;
+ }
+
+ Map<S, WriteBatch> getWriteBatches(
Review Comment:
This is a refactoring to allow sharing more code. The different sub-classes
of `AbstractRocksDBTimeOrderedSegmentedBytesStore` all share a 99% common impl
of `Map<S, WriteBatch> getWriteBatches(Collection<ConsumerRecord<byte[],
byte[]>> records)`, so I add this shared impl which is "customized" this the
corresponding extractors.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java:
##########
@@ -115,32 +125,64 @@ public KeyValue<Bytes, byte[]> next() {
protected abstract Bytes getBaseKey(final Bytes indexKey);
}
- /**
- * Concrete implementation of IndexToBaseStoreIterator for window key
schema.
- * Converts index keys (key-first schema) to base store keys (time-first
schema).
- * <p>
- * This can be reused by both window store implementations (with and
without headers).
- */
- class WindowKeySchemaIndexToBaseStoreIterator extends
IndexToBaseStoreIterator {
- WindowKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes,
byte[]> indexIterator) {
- super(indexIterator);
- }
-
- @Override
- protected Bytes getBaseKey(final Bytes indexKey) {
- final byte[] keyBytes =
KeyFirstWindowKeySchema.extractStoreKeyBytes(indexKey.get());
- final long timestamp =
KeyFirstWindowKeySchema.extractStoreTimestamp(indexKey.get());
- final int seqnum =
KeyFirstWindowKeySchema.extractStoreSequence(indexKey.get());
- return TimeFirstWindowKeySchema.toStoreKeyBinary(keyBytes,
timestamp, seqnum);
- }
- }
-
AbstractRocksDBTimeOrderedSegmentedBytesStore(final String name,
final long retention,
final KeySchema
baseKeySchema,
final Optional<KeySchema>
indexKeySchema,
final AbstractSegments<S>
segments) {
super(name, baseKeySchema, indexKeySchema, segments, retention);
+
+ minTimestamp = Long.MAX_VALUE;
+ }
+
+ Map<S, WriteBatch> getWriteBatches(
+ final Collection<ConsumerRecord<byte[], byte[]>> records,
+ final Function<byte[], Long> timestampExtractor,
+ final Function<byte[], byte[]> indexedKeyExtractor,
+ final Function<byte[], byte[]> baseKeyExtractor
+ ) {
+ // advance stream time to the max timestamp in the batch
+ for (final ConsumerRecord<byte[], byte[]> record : records) {
+ final long timestamp = timestampExtractor.apply(record.key());
//SessionKeySchema.extractEndTimestamp(record.key());
+ minTimestamp = Math.min(minTimestamp, timestamp);
Review Comment:
This computation is `RocksDBTimeOrderedKeyValueBytesStore` specific, but
it's cheap enough to just do it for all sub-classes now.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java:
##########
@@ -115,32 +125,64 @@ public KeyValue<Bytes, byte[]> next() {
protected abstract Bytes getBaseKey(final Bytes indexKey);
}
- /**
- * Concrete implementation of IndexToBaseStoreIterator for window key
schema.
- * Converts index keys (key-first schema) to base store keys (time-first
schema).
- * <p>
- * This can be reused by both window store implementations (with and
without headers).
- */
- class WindowKeySchemaIndexToBaseStoreIterator extends
IndexToBaseStoreIterator {
- WindowKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes,
byte[]> indexIterator) {
- super(indexIterator);
- }
-
- @Override
- protected Bytes getBaseKey(final Bytes indexKey) {
- final byte[] keyBytes =
KeyFirstWindowKeySchema.extractStoreKeyBytes(indexKey.get());
- final long timestamp =
KeyFirstWindowKeySchema.extractStoreTimestamp(indexKey.get());
- final int seqnum =
KeyFirstWindowKeySchema.extractStoreSequence(indexKey.get());
- return TimeFirstWindowKeySchema.toStoreKeyBinary(keyBytes,
timestamp, seqnum);
- }
- }
-
AbstractRocksDBTimeOrderedSegmentedBytesStore(final String name,
final long retention,
final KeySchema
baseKeySchema,
final Optional<KeySchema>
indexKeySchema,
final AbstractSegments<S>
segments) {
super(name, baseKeySchema, indexKeySchema, segments, retention);
+
+ minTimestamp = Long.MAX_VALUE;
+ }
+
+ Map<S, WriteBatch> getWriteBatches(
+ final Collection<ConsumerRecord<byte[], byte[]>> records,
+ final Function<byte[], Long> timestampExtractor,
+ final Function<byte[], byte[]> indexedKeyExtractor,
+ final Function<byte[], byte[]> baseKeyExtractor
+ ) {
+ // advance stream time to the max timestamp in the batch
+ for (final ConsumerRecord<byte[], byte[]> record : records) {
+ final long timestamp = timestampExtractor.apply(record.key());
//SessionKeySchema.extractEndTimestamp(record.key());
+ minTimestamp = Math.min(minTimestamp, timestamp);
+ observedStreamTime = Math.max(observedStreamTime, timestamp);
+ }
+
+ final Map<S, WriteBatch> writeBatchMap = new HashMap<>();
+ for (final ConsumerRecord<byte[], byte[]> record : records) {
+ final long timestamp = timestampExtractor.apply(record.key());
//SessionKeySchema.extractEndTimestamp(record.key());
+ final long segmentId = segments.segmentId(timestamp);
+ final S segment = segments.getOrCreateSegmentIfLive(segmentId,
internalProcessorContext, observedStreamTime);
+ if (segment != null) {
+
ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(
+ record,
+ consistencyEnabled,
+ position
+ );
+ try {
+ final WriteBatch batch =
writeBatchMap.computeIfAbsent(segment, s -> new WriteBatch());
+
+ // Assuming changelog record is serialized using
SessionKeySchema
+ // from ChangeLoggingSessionBytesStore. Reconstruct
key/value to restore
+ if (hasIndex()) {
Review Comment:
This part is only needed for "indexed stores" and thus does not apply to
`RocksDBTimeOrderedKeyValueBytesStore` -- again, it does really add overhead to
`RocksDBTimeOrderedKeyValueBytesStore` so ok IMHO.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractSegments.java:
##########
@@ -184,7 +184,7 @@ public void commit(final Map<TopicPartition, Long>
changelogOffsets) {
}
}
- @SuppressWarnings("deprecation")
+ @Deprecated
Review Comment:
KIP-1035 side cleanup
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java:
##########
@@ -59,9 +67,11 @@
* @see RocksDBTimeOrderedWindowSegmentedBytesStore
*/
public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore<S extends
Segment> extends AbstractDualSchemaRocksDBSegmentedBytesStore<S> {
- private static final Logger LOG =
LoggerFactory.getLogger(AbstractDualSchemaRocksDBSegmentedBytesStore.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractRocksDBTimeOrderedSegmentedBytesStore.class);
- abstract class IndexToBaseStoreIterator implements KeyValueIterator<Bytes,
byte[]> {
+ private long minTimestamp;
+
+ public abstract class IndexToBaseStoreIterator implements
KeyValueIterator<Bytes, byte[]> {
Review Comment:
Another side cleanup -- we use many of these classes outside of the
`internals` package, so we should just declare them `public`.
There is more changes like this on this PR -- if we think we don't want this
change, we can also revert it.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders.java:
##########
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.query.PositionBound;
+import org.apache.kafka.streams.query.Query;
+import org.apache.kafka.streams.query.QueryConfig;
+import org.apache.kafka.streams.query.QueryResult;
+
+/**
+ * A RocksDB-backed time-ordered segmented bytes store with headers support
for session key schema.
+ * <p>
+ * This store extends {@link AbstractRocksDBTimeOrderedSegmentedBytesStore}
and uses
+ * {@link SessionSegmentsWithHeaders} to manage segments with full header
support,
+ * including Column Family management and lazy migration from legacy formats.
+ * <p>
+ * The store maintains a dual-schema architecture:
+ * <ul>
+ * <li>Base store: Time-first session key schema for efficient time-range
queries</li>
+ * <li>Index store (optional): Key-first session key schema for efficient
key-based queries</li>
+ * </ul>
+ * <p>
+ * Headers are managed at the segment level by {@link
SessionSegmentWithHeaders}.
+ * <p>
+ * Value format (timestamps are in the key, not in the value):
+ * <ul>
+ * <li>Old format: {@code [aggregationBytes]}</li>
+ * <li>New format: {@code
[headersSize(varint)][headersBytes][aggregationBytes]}</li>
+ * </ul>
+ *
+ * @see RocksDBTimeOrderedSessionStore
+ * @see SessionSegmentsWithHeaders
+ * @see SessionSegmentWithHeaders
+ */
+class RocksDBTimeOrderedSessionSegmentedBytesStoreWithHeaders
Review Comment:
Added the new class we need is not trivial with the unified code on the
super-classes :)
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java:
##########
@@ -59,9 +67,11 @@
* @see RocksDBTimeOrderedWindowSegmentedBytesStore
*/
public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore<S extends
Segment> extends AbstractDualSchemaRocksDBSegmentedBytesStore<S> {
- private static final Logger LOG =
LoggerFactory.getLogger(AbstractDualSchemaRocksDBSegmentedBytesStore.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractRocksDBTimeOrderedSegmentedBytesStore.class);
- abstract class IndexToBaseStoreIterator implements KeyValueIterator<Bytes,
byte[]> {
+ private long minTimestamp;
Review Comment:
I unified more code (cf other comments) -- this is move from
`RocksDBTimeOrderedKeyValueBytesStore` to here
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java:
##########
@@ -115,32 +125,64 @@ public KeyValue<Bytes, byte[]> next() {
protected abstract Bytes getBaseKey(final Bytes indexKey);
}
- /**
- * Concrete implementation of IndexToBaseStoreIterator for window key
schema.
- * Converts index keys (key-first schema) to base store keys (time-first
schema).
- * <p>
- * This can be reused by both window store implementations (with and
without headers).
- */
- class WindowKeySchemaIndexToBaseStoreIterator extends
IndexToBaseStoreIterator {
- WindowKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes,
byte[]> indexIterator) {
- super(indexIterator);
- }
-
- @Override
- protected Bytes getBaseKey(final Bytes indexKey) {
- final byte[] keyBytes =
KeyFirstWindowKeySchema.extractStoreKeyBytes(indexKey.get());
- final long timestamp =
KeyFirstWindowKeySchema.extractStoreTimestamp(indexKey.get());
- final int seqnum =
KeyFirstWindowKeySchema.extractStoreSequence(indexKey.get());
- return TimeFirstWindowKeySchema.toStoreKeyBinary(keyBytes,
timestamp, seqnum);
- }
- }
-
AbstractRocksDBTimeOrderedSegmentedBytesStore(final String name,
final long retention,
final KeySchema
baseKeySchema,
final Optional<KeySchema>
indexKeySchema,
final AbstractSegments<S>
segments) {
super(name, baseKeySchema, indexKeySchema, segments, retention);
+
+ minTimestamp = Long.MAX_VALUE;
+ }
+
+ Map<S, WriteBatch> getWriteBatches(
+ final Collection<ConsumerRecord<byte[], byte[]>> records,
+ final Function<byte[], Long> timestampExtractor,
+ final Function<byte[], byte[]> indexedKeyExtractor,
+ final Function<byte[], byte[]> baseKeyExtractor
+ ) {
+ // advance stream time to the max timestamp in the batch
+ for (final ConsumerRecord<byte[], byte[]> record : records) {
+ final long timestamp = timestampExtractor.apply(record.key());
//SessionKeySchema.extractEndTimestamp(record.key());
+ minTimestamp = Math.min(minTimestamp, timestamp);
+ observedStreamTime = Math.max(observedStreamTime, timestamp);
+ }
Review Comment:
This loop was not in `RocksDBTimeOrderedKeyValueBytesStore`, but I believe
it was actually a small bug there... the original code in
`RocksDBTimeOrderedKeyValueBytesStore` did compute `observedStreamTime` not
upfront, but in the main loop, what seem incorrect?
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java:
##########
@@ -115,32 +125,64 @@ public KeyValue<Bytes, byte[]> next() {
protected abstract Bytes getBaseKey(final Bytes indexKey);
}
- /**
- * Concrete implementation of IndexToBaseStoreIterator for window key
schema.
- * Converts index keys (key-first schema) to base store keys (time-first
schema).
- * <p>
- * This can be reused by both window store implementations (with and
without headers).
- */
- class WindowKeySchemaIndexToBaseStoreIterator extends
IndexToBaseStoreIterator {
Review Comment:
This class belongs to `RocksDBTimeOrderedWindowSegmentedBytesStore` -- moved
it there
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegment.java:
##########
@@ -156,7 +156,7 @@ public synchronized void close() {
iterators = new HashSet<>(openIterators);
openIterators.clear();
}
- if (iterators.size() != 0) {
+ if (!iterators.isEmpty()) {
Review Comment:
side cleanup
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java:
##########
@@ -59,9 +67,11 @@
* @see RocksDBTimeOrderedWindowSegmentedBytesStore
*/
public abstract class AbstractRocksDBTimeOrderedSegmentedBytesStore<S extends
Segment> extends AbstractDualSchemaRocksDBSegmentedBytesStore<S> {
- private static final Logger LOG =
LoggerFactory.getLogger(AbstractDualSchemaRocksDBSegmentedBytesStore.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractRocksDBTimeOrderedSegmentedBytesStore.class);
Review Comment:
Side fix
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalKeyValueSegments.java:
##########
@@ -86,8 +86,8 @@ LogicalKeyValueSegment createReservedSegment(final long
segmentId,
}
// VisibleForTesting
- LogicalKeyValueSegment getReservedSegment(final long segmentId) {
- return reservedSegments.get(segmentId);
+ LogicalKeyValueSegment getReservedSegment() {
+ return reservedSegments.get(-1L);
Review Comment:
Side cleanup -- we always pas `-1L`.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBSessionStoreWithHeaders.java:
##########
@@ -27,7 +28,7 @@
* RocksDB-backed session store with support for record headers.
* <p>
* This store extends {@link RocksDBSessionStore} and returns
- * {@link QueryResult#forUnknownQueryType(Query, Object)} for all queries,
+ * {@link QueryResult#forUnknownQueryType(Query, StateStore)} for all queries,
Review Comment:
Side fix (also elsewhere)
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedWindowStoreWithHeadersBuilder.java:
##########
@@ -115,9 +115,6 @@ private boolean isTimeOrderedStore(final StateStore
stateStore) {
if (stateStore instanceof RocksDBTimeOrderedWindowStore) {
return true;
}
- if (stateStore instanceof RocksDBTimeOrderedWindowStoreWithHeaders) {
Review Comment:
`RocksDBTimeOrderedWindowStoreWithHeaders` extends
`RocksDBTimeOrderedWindowStore` so this second check is unnecessary.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBTimeOrderedWindowStore.java:
##########
@@ -35,8 +35,8 @@
import java.util.Objects;
-public class RocksDBTimeOrderedWindowStore
- extends WrappedStateStore<AbstractRocksDBTimeOrderedSegmentedBytesStore<?
extends Segment>, Object, Object>
+public class RocksDBTimeOrderedWindowStore<S extends Segment>
Review Comment:
This the the follow up from https://github.com/apache/kafka/pull/21780
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]