mjsax commented on code in PR #21794:
URL: https://github.com/apache/kafka/pull/21794#discussion_r2957530280


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBTimeOrderedSegmentedBytesStore.java:
##########
@@ -115,32 +125,64 @@ public KeyValue<Bytes, byte[]> next() {
         protected abstract Bytes getBaseKey(final Bytes indexKey);
     }
 
-    /**
-     * Concrete implementation of IndexToBaseStoreIterator for window key 
schema.
-     * Converts index keys (key-first schema) to base store keys (time-first 
schema).
-     * <p>
-     * This can be reused by both window store implementations (with and 
without headers).
-     */
-    class WindowKeySchemaIndexToBaseStoreIterator extends 
IndexToBaseStoreIterator {
-        WindowKeySchemaIndexToBaseStoreIterator(final KeyValueIterator<Bytes, 
byte[]> indexIterator) {
-            super(indexIterator);
-        }
-
-        @Override
-        protected Bytes getBaseKey(final Bytes indexKey) {
-            final byte[] keyBytes = 
KeyFirstWindowKeySchema.extractStoreKeyBytes(indexKey.get());
-            final long timestamp = 
KeyFirstWindowKeySchema.extractStoreTimestamp(indexKey.get());
-            final int seqnum = 
KeyFirstWindowKeySchema.extractStoreSequence(indexKey.get());
-            return TimeFirstWindowKeySchema.toStoreKeyBinary(keyBytes, 
timestamp, seqnum);
-        }
-    }
-
     AbstractRocksDBTimeOrderedSegmentedBytesStore(final String name,
                                                   final long retention,
                                                   final KeySchema 
baseKeySchema,
                                                   final Optional<KeySchema> 
indexKeySchema,
                                                   final AbstractSegments<S> 
segments) {
         super(name, baseKeySchema, indexKeySchema, segments, retention);
+
+        minTimestamp = Long.MAX_VALUE;
+    }
+
+    Map<S, WriteBatch> getWriteBatches(

Review Comment:
   This is a refactoring to allow sharing more code. The different sub-classes 
of `AbstractRocksDBTimeOrderedSegmentedBytesStore` all share a 99% common impl 
of `Map<S, WriteBatch> getWriteBatches(Collection<ConsumerRecord<byte[], 
byte[]>> records)`, so I add this shared impl which is "customized" by passing 
corresponding extractors.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to