mjsax commented on code in PR #21794:
URL: https://github.com/apache/kafka/pull/21794#discussion_r2957530280
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java:
##########
@@ -115,32 +125,64 @@ public KeyValue<Bytes, byte[]> next() {
protected abstract Bytes getBaseKey(final Bytes indexKey);
}
- /**
- * Concrete implementation of IndexToBaseStoreIterator for window key
schema.
- * Converts index keys (key-first schema) to base store keys (time-first
schema).
- * <p>
- * This can be reused by both window store implementations (with and
without headers).
- */
- class WindowKeySchemaIndexToBaseStoreIterator extends
IndexToBaseStoreIterator {
- WindowKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes,
byte[]> indexIterator) {
- super(indexIterator);
- }
-
- @Override
- protected Bytes getBaseKey(final Bytes indexKey) {
- final byte[] keyBytes =
KeyFirstWindowKeySchema.extractStoreKeyBytes(indexKey.get());
- final long timestamp =
KeyFirstWindowKeySchema.extractStoreTimestamp(indexKey.get());
- final int seqnum =
KeyFirstWindowKeySchema.extractStoreSequence(indexKey.get());
- return TimeFirstWindowKeySchema.toStoreKeyBinary(keyBytes,
timestamp, seqnum);
- }
- }
-
AbstractRocksDBTimeOrderedSegmentedBytesStore(final String name,
final long retention,
final KeySchema
baseKeySchema,
final Optional<KeySchema>
indexKeySchema,
final AbstractSegments<S>
segments) {
super(name, baseKeySchema, indexKeySchema, segments, retention);
+
+ minTimestamp = Long.MAX_VALUE;
+ }
+
+ Map<S, WriteBatch> getWriteBatches(
Review Comment:
This is a refactoring to allow sharing more code. The different sub-classes
of `AbstractRocksDBTimeOrderedSegmentedBytesStore` all share a 99% common impl
of `Map<S, WriteBatch> getWriteBatches(Collection<ConsumerRecord<byte[],
byte[]>> records)`, so I add this shared impl which is "customized" by passing
corresponding extractors.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]