mjsax commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1420058690


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -69,23 +76,50 @@ public boolean hasNext() {
         if (!open) {
             throw new IllegalStateException("The iterator is out of scope.");
         }
-        // since data is stored in descending order in the segments, check 
whether there is any previous record, if the order is Ascending.
-        final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? 
iterator.hasPrevious() : iterator.hasNext();
-        return hasStillLoad || maybeFillIterator();
+        if (this.next != null) {
+            return true;
+        }
+
+        while ((currentDeserializedSegmentValue != null || 
currentRawSegmentValue != null || segmentIterator.hasNext()) && this.next == 
null) {
+            boolean hasSegmentValue = currentDeserializedSegmentValue != null 
|| currentRawSegmentValue != null;
+            if (!hasSegmentValue) {
+                hasSegmentValue = maybeFillCurrentSegmentValue();
+            }
+            if (hasSegmentValue) {
+                this.next  = (VersionedRecord) getNextRecord();
+                if (this.next == null) {
+                    prepareToFetchNextSegment();
+                }
+            }
+        }
+        return this.next != null;
     }
 
+    @SuppressWarnings("unchecked")

Review Comment:
   We can avoid this suppression by fixing generic types. This class should be 
defined as
   ```
   public class LogicalSegmentIterator implements 
VersionedRecordIterator<byte[]> {
   ```
   (ie, add the return type `byte[]`)
   
   This allows us to change the return type of `next()`:
   ```
   public VersionedRecord<byte[]> next() {
   ```
   what make the suppression unnecessary and we can also use fully types 
`VersionedRecord<byte[]>` throughput the code base of this class (instead of 
untype `VersionRecord`, and we can avoid casts)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -69,23 +76,50 @@ public boolean hasNext() {
         if (!open) {
             throw new IllegalStateException("The iterator is out of scope.");
         }
-        // since data is stored in descending order in the segments, check 
whether there is any previous record, if the order is Ascending.
-        final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? 
iterator.hasPrevious() : iterator.hasNext();
-        return hasStillLoad || maybeFillIterator();
+        if (this.next != null) {
+            return true;
+        }
+
+        while ((currentDeserializedSegmentValue != null || 
currentRawSegmentValue != null || segmentIterator.hasNext()) && this.next == 
null) {
+            boolean hasSegmentValue = currentDeserializedSegmentValue != null 
|| currentRawSegmentValue != null;
+            if (!hasSegmentValue) {
+                hasSegmentValue = maybeFillCurrentSegmentValue();
+            }
+            if (hasSegmentValue) {
+                this.next  = (VersionedRecord) getNextRecord();

Review Comment:
   Remove cast (should not be necessary... cf other comments)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -100,34 +134,49 @@ private boolean maybeFillIterator() {
             final byte[] rawSegmentValue = segment.get(key, snapshot);
             if (rawSegmentValue != null) { // this segment contains record(s) 
with the specified key
                 if (segment.id() == -1) { // this is the latestValueStore
-                    final long recordTimestamp = 
RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue);
-                    if (recordTimestamp <= toTime) {
-                        // latest value satisfies timestamp bound
-                        queryResults.add(new 
VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue),
 recordTimestamp));
-                    }
+                    this.currentRawSegmentValue = rawSegmentValue;
                 } else {
-                    // this segment contains records with the specified key 
and time range
-                    final 
List<RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult>
 searchResults =
-                            
RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue).findAll(fromTime,
 toTime);
-                    for (final 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
searchResult : searchResults) {
-                        queryResults.add(new 
VersionedRecord<>(searchResult.value(), searchResult.validFrom(), 
searchResult.validTo()));
-                    }
+                    this.currentDeserializedSegmentValue = 
RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue);
+                    this.minTimestamp = 
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(rawSegmentValue);
+                    this.nextTimestamp = 
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue);
                 }
-            }
-            if (!queryResults.isEmpty()) {
-                break;
+                return true;
             }
         }
-        if (!queryResults.isEmpty()) {
-            // since data is stored in descending order in the segments, 
create the list in reverse order, if the order is Ascending.
-            this.iterator = order.equals(ResultOrder.ASCENDING) ? 
queryResults.listIterator(queryResults.size()) : queryResults.listIterator();
-            return true;
-        }
         // if all segments have been processed, release the snapshot
         releaseSnapshot();
         return false;
     }
 
+    private Object getNextRecord() {
+        VersionedRecord nextRecord = null;
+        if (currentRawSegmentValue != null) { // this is the latestValueStore
+            final long recordTimestamp = 
RocksDBVersionedStore.LatestValueFormatter.getTimestamp(currentRawSegmentValue);
+            if (recordTimestamp <= toTime) {
+                final byte[] value = 
RocksDBVersionedStore.LatestValueFormatter.getValue(currentRawSegmentValue);
+                // latest value satisfies timestamp bound
+                nextRecord = new VersionedRecord<>(value, recordTimestamp);
+            }
+        } else {
+            final 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
currentRecord =
+                    currentDeserializedSegmentValue.find(fromTime, toTime, 
order);
+            if (currentRecord != null) {
+                nextRecord = new VersionedRecord<>(currentRecord.value(), 
currentRecord.validFrom(), currentRecord.validTo());
+            }
+        }
+        // no relevant record can be found in the segment
+        if (currentRawSegmentValue != null || nextRecord == null || 
!canSegmentHaveMoreRelevantRecords(nextRecord.timestamp(), 
Long.parseLong(nextRecord.validTo().get().toString()))) {

Review Comment:
   `Long.parseLong(nextRecord.validTo().get().toString())` -- why to we get 
valid-to as `String` and pipe though `parseLong`? Why can't we just pass 
`nextRecord.validTo().get()` into `canSegmentHaveMoreRelevantRecords(...)`?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -340,32 +355,67 @@ public SegmentSearchResult find(final long timestamp, 
final boolean includeValue
         }
 
         @Override
-        public List<SegmentSearchResult> findAll(final long fromTime, final 
long toTime) {
-            long currNextTimestamp = nextTimestamp;
-            final List<SegmentSearchResult> segmentSearchResults = new 
ArrayList<>();
-            long currTimestamp = -1L; // choose an invalid timestamp. if this 
is valid, this needs to be re-worked
-            int currValueSize;
-            int currIndex = 0;
-            int cumValueSize = 0;
-            while (currTimestamp != minTimestamp) {
-                final int timestampSegmentIndex = 2 * TIMESTAMP_SIZE + 
currIndex * (TIMESTAMP_SIZE + VALUE_SIZE);
+        public SegmentSearchResult find(final long fromTime, final long 
toTime, final ResultOrder order) {
+            // this segment does not have any record in query specified time 
range
+            if (toTime < minTimestamp || fromTime > nextTimestamp) {
+                return null;
+            }
+            long currTimestamp = -1;
+            long currNextTimestamp = -1;
+
+
+            if (order.equals(ResultOrder.ASCENDING) && valuesStartingIndex == 
-1) {
+                findValuesStartingIndex();
+            }
+
+            while (hasStillRecord(currTimestamp, currNextTimestamp, order)) {
+                final int timestampSegmentIndex = getTimestampIndex(order, 
currentDeserIndex);
                 currTimestamp = 
ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex);
-                currValueSize = 
ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
-                cumValueSize += Math.max(currValueSize, 0);
+                currNextTimestamp = timestampSegmentIndex == 2 * 
TIMESTAMP_SIZE ? nextTimestamp // if this is the first record metadata 
(timestamp + value size)
+                                                                               
 : ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex - 
(TIMESTAMP_SIZE + VALUE_SIZE));

Review Comment:
   Cf other comment blow: can we avoid parsing to get back the long, but would 
we get it fom a cache?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -266,6 +276,11 @@ private static class PartiallyDeserializedSegmentValue 
implements SegmentValue {
         private int deserIndex = -1; // index up through which this segment 
has been deserialized (inclusive)
         private List<TimestampAndValueSize> 
unpackedReversedTimestampAndValueSizes;
         private List<Integer> cumulativeValueSizes; // ordered same as 
timestamp and value sizes (reverse time-sorted)
+        private int valuesStartingIndex = -1; // the index of first value in 
the segment
+        private int currentCumValueSize = 0; // this is for deserializing a 
segment records in a lazy manner
+        private int currentDeserIndex = 0; // this is for deserializing a 
segment records in a lazy manner

Review Comment:
   `PartiallyDeserializedSegmentValue` already has `cumulativeValueSizes ` and 
`deserIndex` because it is already lazy, but only support deserialization in 
one direction (I believe descending time, ie, ascending order inside the 
`byte[] segmentValue`).
   
   Why do we need to duplicate both variables? Can't we reuse the existing ones 
(or course, we might need to add a `Order` member to track in which direction 
we intent to deserialized?)
   
   Don't want to make it unnecessary complex, so if this idea make the code 
more difficult, happy to keep the new vars, but it seems we might want to 
change their name to indicate when (ascending vs descending) they are used?
   
   



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -69,23 +76,50 @@ public boolean hasNext() {
         if (!open) {
             throw new IllegalStateException("The iterator is out of scope.");
         }
-        // since data is stored in descending order in the segments, check 
whether there is any previous record, if the order is Ascending.
-        final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? 
iterator.hasPrevious() : iterator.hasNext();
-        return hasStillLoad || maybeFillIterator();
+        if (this.next != null) {
+            return true;
+        }
+
+        while ((currentDeserializedSegmentValue != null || 
currentRawSegmentValue != null || segmentIterator.hasNext()) && this.next == 
null) {
+            boolean hasSegmentValue = currentDeserializedSegmentValue != null 
|| currentRawSegmentValue != null;
+            if (!hasSegmentValue) {
+                hasSegmentValue = maybeFillCurrentSegmentValue();
+            }
+            if (hasSegmentValue) {
+                this.next  = (VersionedRecord) getNextRecord();
+                if (this.next == null) {
+                    prepareToFetchNextSegment();
+                }
+            }
+        }
+        return this.next != null;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public Object next() {
-        if (hasNext()) {
-            // since data is stored in descending order in the segments, 
retrieve previous record, if the order is Ascending.
-            return order.equals(ResultOrder.ASCENDING) ? iterator.previous() : 
iterator.next();
+        if (this.next == null) {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
         }
-        throw new NoSuchElementException();
+        final VersionedRecord clonedNext = this.next.validTo().isPresent() ? 
new VersionedRecord(this.next.value(), this.next.timestamp(), 
Long.parseLong(this.next.validTo().get().toString()))

Review Comment:
   Add missing generic type `<byte>` to `VersionedRecord` (I am sure there is 
more of this -- please address everywhere in this class)
   
   Also: why do we need to clone the object? Can't we just pass the object 
reference?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -69,23 +76,50 @@ public boolean hasNext() {
         if (!open) {
             throw new IllegalStateException("The iterator is out of scope.");
         }
-        // since data is stored in descending order in the segments, check 
whether there is any previous record, if the order is Ascending.
-        final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? 
iterator.hasPrevious() : iterator.hasNext();
-        return hasStillLoad || maybeFillIterator();
+        if (this.next != null) {
+            return true;
+        }
+
+        while ((currentDeserializedSegmentValue != null || 
currentRawSegmentValue != null || segmentIterator.hasNext()) && this.next == 
null) {
+            boolean hasSegmentValue = currentDeserializedSegmentValue != null 
|| currentRawSegmentValue != null;
+            if (!hasSegmentValue) {
+                hasSegmentValue = maybeFillCurrentSegmentValue();
+            }
+            if (hasSegmentValue) {
+                this.next  = (VersionedRecord) getNextRecord();
+                if (this.next == null) {
+                    prepareToFetchNextSegment();
+                }
+            }
+        }
+        return this.next != null;
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public Object next() {

Review Comment:
   Should return `VerionedRecord<byte[]>` instead of `Object`



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -150,7 +152,15 @@ interface SegmentValue {
          */
         SegmentSearchResult find(long timestamp, boolean includeValue);
 
-        List<SegmentSearchResult> findAll(long fromTime, long toTime);
+        /**
+         * Finds the next/previous record in this segment row that is valid 
within the query specified time range
+         *
+         * @param fromTime the query starting time point
+         * @param toTime the query ending time point
+         * @param order specifies the order (based on timestamp) in which 
records are returned
+         * @return the record that is found, null if no record is found
+         */
+        SegmentSearchResult find(long fromTime, long toTime, ResultOrder 
order);

Review Comment:
   I am wondering if existing `find()` and the new `find()` can be called 
interleaved without breaking anything?
   
   So far, `PartiallyDeserializedSetmentValue` is already lazy and deserializes 
from newest to oldest. It would be a difficult API contract if calling existing 
`find` and new `find` interleaved would break anything?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatterTest.java:
##########
@@ -215,40 +219,83 @@ public void shouldFindByTimestamp() {
             assertThrows(IllegalArgumentException.class, () -> 
segmentValue.find(testCase.minTimestamp - 1, false));
         }
 
+        @SuppressWarnings("checkstyle:all")

Review Comment:
   Please cleanup code so we can avoid this suppression.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -100,34 +134,49 @@ private boolean maybeFillIterator() {
             final byte[] rawSegmentValue = segment.get(key, snapshot);
             if (rawSegmentValue != null) { // this segment contains record(s) 
with the specified key
                 if (segment.id() == -1) { // this is the latestValueStore
-                    final long recordTimestamp = 
RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue);
-                    if (recordTimestamp <= toTime) {
-                        // latest value satisfies timestamp bound
-                        queryResults.add(new 
VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue),
 recordTimestamp));
-                    }
+                    this.currentRawSegmentValue = rawSegmentValue;
                 } else {
-                    // this segment contains records with the specified key 
and time range
-                    final 
List<RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult>
 searchResults =
-                            
RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue).findAll(fromTime,
 toTime);
-                    for (final 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
searchResult : searchResults) {
-                        queryResults.add(new 
VersionedRecord<>(searchResult.value(), searchResult.validFrom(), 
searchResult.validTo()));
-                    }
+                    this.currentDeserializedSegmentValue = 
RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue);
+                    this.minTimestamp = 
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(rawSegmentValue);

Review Comment:
   `currentDeserializedSegmentValue` should have both `minTimestamp` and 
`nextTimestamp` already deserialized  -- can we avoid parsing the bytes a 
second time, and just get both from `currentDeserializedSegmentValue` (it's all 
internal classes, so I don't see a reason why we could not add a "getter" for 
each to `SegmentValue`?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -340,32 +355,67 @@ public SegmentSearchResult find(final long timestamp, 
final boolean includeValue
         }
 
         @Override
-        public List<SegmentSearchResult> findAll(final long fromTime, final 
long toTime) {
-            long currNextTimestamp = nextTimestamp;
-            final List<SegmentSearchResult> segmentSearchResults = new 
ArrayList<>();
-            long currTimestamp = -1L; // choose an invalid timestamp. if this 
is valid, this needs to be re-worked
-            int currValueSize;
-            int currIndex = 0;
-            int cumValueSize = 0;
-            while (currTimestamp != minTimestamp) {
-                final int timestampSegmentIndex = 2 * TIMESTAMP_SIZE + 
currIndex * (TIMESTAMP_SIZE + VALUE_SIZE);
+        public SegmentSearchResult find(final long fromTime, final long 
toTime, final ResultOrder order) {
+            // this segment does not have any record in query specified time 
range
+            if (toTime < minTimestamp || fromTime > nextTimestamp) {
+                return null;
+            }
+            long currTimestamp = -1;
+            long currNextTimestamp = -1;
+
+
+            if (order.equals(ResultOrder.ASCENDING) && valuesStartingIndex == 
-1) {
+                findValuesStartingIndex();
+            }
+
+            while (hasStillRecord(currTimestamp, currNextTimestamp, order)) {
+                final int timestampSegmentIndex = getTimestampIndex(order, 
currentDeserIndex);
                 currTimestamp = 
ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex);
-                currValueSize = 
ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
-                cumValueSize += Math.max(currValueSize, 0);
+                currNextTimestamp = timestampSegmentIndex == 2 * 
TIMESTAMP_SIZE ? nextTimestamp // if this is the first record metadata 
(timestamp + value size)
+                                                                               
 : ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex - 
(TIMESTAMP_SIZE + VALUE_SIZE));
+                final int currValueSize = 
ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
                 if (currValueSize >= 0) {
                     final byte[] value = new byte[currValueSize];
-                    final int valueSegmentIndex = segmentValue.length - 
cumValueSize;
+                    final int valueSegmentIndex = getValueSegmentIndex(order, 
currentCumValueSize, currValueSize);
                     System.arraycopy(segmentValue, valueSegmentIndex, value, 
0, currValueSize);
                     if (currTimestamp <= toTime && currNextTimestamp > 
fromTime) {
-                        segmentSearchResults.add(new 
SegmentSearchResult(currIndex, currTimestamp, currNextTimestamp, value));
+                        currentCumValueSize += currValueSize;
+                        currentDeserIndex++;
+                        return new SegmentSearchResult(currentDeserIndex - 1, 
currTimestamp, currNextTimestamp, value);
                     }
                 }
-
                 // prep for next iteration
-                currNextTimestamp = currTimestamp;
+                currentCumValueSize += Math.max(currValueSize, 0);
+                currentDeserIndex++;
+            }
+            // search in segment expected to find result but did not
+            return null;
+        }
+
+        private boolean hasStillRecord(final long currTimestamp, final long 
currNextTimestamp, final ResultOrder order) {
+            return order.equals(ResultOrder.ASCENDING) ? currNextTimestamp != 
nextTimestamp : currTimestamp != minTimestamp;
+        }
+
+        private int getValueSegmentIndex(final ResultOrder order, final int 
currentCumValueSize, final int currValueSize) {
+            return order.equals(ResultOrder.ASCENDING) ? valuesStartingIndex + 
currentCumValueSize
+                                                       : segmentValue.length - 
(currentCumValueSize + currValueSize);
+        }
+
+        private int getTimestampIndex(final ResultOrder order, final int 
currIndex) {
+            return order.equals(ResultOrder.ASCENDING) ? valuesStartingIndex - 
((currIndex + 1) * (TIMESTAMP_SIZE + VALUE_SIZE))
+                                                       : 2 * TIMESTAMP_SIZE + 
currIndex * (TIMESTAMP_SIZE + VALUE_SIZE);
+        }
+
+        private void findValuesStartingIndex() {

Review Comment:
   Too bad we did not encode the number of elements in the segment? Seems like 
a pre-mature optimization to save 4 bytes. Now we need to iterate over the full 
list... :( 



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -266,6 +276,11 @@ private static class PartiallyDeserializedSegmentValue 
implements SegmentValue {
         private int deserIndex = -1; // index up through which this segment 
has been deserialized (inclusive)
         private List<TimestampAndValueSize> 
unpackedReversedTimestampAndValueSizes;
         private List<Integer> cumulativeValueSizes; // ordered same as 
timestamp and value sizes (reverse time-sorted)
+        private int valuesStartingIndex = -1; // the index of first value in 
the segment

Review Comment:
   "first value" based on ts, right? but last value in the list... might be 
good to extend the comment



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -340,32 +355,67 @@ public SegmentSearchResult find(final long timestamp, 
final boolean includeValue
         }
 
         @Override
-        public List<SegmentSearchResult> findAll(final long fromTime, final 
long toTime) {
-            long currNextTimestamp = nextTimestamp;
-            final List<SegmentSearchResult> segmentSearchResults = new 
ArrayList<>();
-            long currTimestamp = -1L; // choose an invalid timestamp. if this 
is valid, this needs to be re-worked
-            int currValueSize;
-            int currIndex = 0;
-            int cumValueSize = 0;
-            while (currTimestamp != minTimestamp) {
-                final int timestampSegmentIndex = 2 * TIMESTAMP_SIZE + 
currIndex * (TIMESTAMP_SIZE + VALUE_SIZE);
+        public SegmentSearchResult find(final long fromTime, final long 
toTime, final ResultOrder order) {
+            // this segment does not have any record in query specified time 
range
+            if (toTime < minTimestamp || fromTime > nextTimestamp) {
+                return null;
+            }
+            long currTimestamp = -1;
+            long currNextTimestamp = -1;
+
+
+            if (order.equals(ResultOrder.ASCENDING) && valuesStartingIndex == 
-1) {
+                findValuesStartingIndex();
+            }
+
+            while (hasStillRecord(currTimestamp, currNextTimestamp, order)) {
+                final int timestampSegmentIndex = getTimestampIndex(order, 
currentDeserIndex);
                 currTimestamp = 
ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex);
-                currValueSize = 
ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
-                cumValueSize += Math.max(currValueSize, 0);
+                currNextTimestamp = timestampSegmentIndex == 2 * 
TIMESTAMP_SIZE ? nextTimestamp // if this is the first record metadata 
(timestamp + value size)
+                                                                               
 : ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex - 
(TIMESTAMP_SIZE + VALUE_SIZE));
+                final int currValueSize = 
ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);

Review Comment:
   As above: can we get this from a cache?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -34,7 +31,16 @@ public class LogicalSegmentIterator implements 
VersionedRecordIterator {
     private final Long fromTime;
     private final Long toTime;
     private final ResultOrder order;
-    private ListIterator<VersionedRecord<byte[]>> iterator;
+    // stores the raw value of the latestValueStore when latestValueStore is 
the current segment
+    private byte[] currentRawSegmentValue;
+    // stores the deserialized value of the current segment (when current 
segment is one of the old segments)
+    private RocksDBVersionedStoreSegmentValueFormatter.SegmentValue 
currentDeserializedSegmentValue;
+    // current segment minTimestamp (when current segment is not the 
latestValueStore)
+    private long minTimestamp;
+    // current segment nextTimestamp (when current segment is not the 
latestValueStore)
+    private long nextTimestamp;
+    private VersionedRecord next;

Review Comment:
   Add missing generic type `<byte[]>`



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -340,32 +355,67 @@ public SegmentSearchResult find(final long timestamp, 
final boolean includeValue
         }
 
         @Override
-        public List<SegmentSearchResult> findAll(final long fromTime, final 
long toTime) {
-            long currNextTimestamp = nextTimestamp;
-            final List<SegmentSearchResult> segmentSearchResults = new 
ArrayList<>();
-            long currTimestamp = -1L; // choose an invalid timestamp. if this 
is valid, this needs to be re-worked
-            int currValueSize;
-            int currIndex = 0;
-            int cumValueSize = 0;
-            while (currTimestamp != minTimestamp) {
-                final int timestampSegmentIndex = 2 * TIMESTAMP_SIZE + 
currIndex * (TIMESTAMP_SIZE + VALUE_SIZE);
+        public SegmentSearchResult find(final long fromTime, final long 
toTime, final ResultOrder order) {
+            // this segment does not have any record in query specified time 
range
+            if (toTime < minTimestamp || fromTime > nextTimestamp) {
+                return null;
+            }
+            long currTimestamp = -1;
+            long currNextTimestamp = -1;
+
+
+            if (order.equals(ResultOrder.ASCENDING) && valuesStartingIndex == 
-1) {
+                findValuesStartingIndex();
+            }
+
+            while (hasStillRecord(currTimestamp, currNextTimestamp, order)) {
+                final int timestampSegmentIndex = getTimestampIndex(order, 
currentDeserIndex);
                 currTimestamp = 
ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex);
-                currValueSize = 
ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
-                cumValueSize += Math.max(currValueSize, 0);
+                currNextTimestamp = timestampSegmentIndex == 2 * 
TIMESTAMP_SIZE ? nextTimestamp // if this is the first record metadata 
(timestamp + value size)
+                                                                               
 : ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex - 
(TIMESTAMP_SIZE + VALUE_SIZE));
+                final int currValueSize = 
ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
                 if (currValueSize >= 0) {
                     final byte[] value = new byte[currValueSize];
-                    final int valueSegmentIndex = segmentValue.length - 
cumValueSize;
+                    final int valueSegmentIndex = getValueSegmentIndex(order, 
currentCumValueSize, currValueSize);
                     System.arraycopy(segmentValue, valueSegmentIndex, value, 
0, currValueSize);
                     if (currTimestamp <= toTime && currNextTimestamp > 
fromTime) {
-                        segmentSearchResults.add(new 
SegmentSearchResult(currIndex, currTimestamp, currNextTimestamp, value));
+                        currentCumValueSize += currValueSize;
+                        currentDeserIndex++;
+                        return new SegmentSearchResult(currentDeserIndex - 1, 
currTimestamp, currNextTimestamp, value);
                     }
                 }
-
                 // prep for next iteration
-                currNextTimestamp = currTimestamp;
+                currentCumValueSize += Math.max(currValueSize, 0);
+                currentDeserIndex++;
+            }
+            // search in segment expected to find result but did not
+            return null;
+        }
+
+        private boolean hasStillRecord(final long currTimestamp, final long 
currNextTimestamp, final ResultOrder order) {
+            return order.equals(ResultOrder.ASCENDING) ? currNextTimestamp != 
nextTimestamp : currTimestamp != minTimestamp;
+        }
+
+        private int getValueSegmentIndex(final ResultOrder order, final int 
currentCumValueSize, final int currValueSize) {
+            return order.equals(ResultOrder.ASCENDING) ? valuesStartingIndex + 
currentCumValueSize
+                                                       : segmentValue.length - 
(currentCumValueSize + currValueSize);
+        }
+
+        private int getTimestampIndex(final ResultOrder order, final int 
currIndex) {
+            return order.equals(ResultOrder.ASCENDING) ? valuesStartingIndex - 
((currIndex + 1) * (TIMESTAMP_SIZE + VALUE_SIZE))
+                                                       : 2 * TIMESTAMP_SIZE + 
currIndex * (TIMESTAMP_SIZE + VALUE_SIZE);
+        }
+
+        private void findValuesStartingIndex() {
+            long currTimestamp = -1;
+            int currIndex = 0;
+            int timestampSegmentIndex = 0;
+            while (currTimestamp != minTimestamp) {
+                timestampSegmentIndex = 2 * TIMESTAMP_SIZE + currIndex * 
(TIMESTAMP_SIZE + VALUE_SIZE);
+                currTimestamp = 
ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex);

Review Comment:
   Given that we parse all timestamps, we should buffer them (similar to what 
we already do with `unpackedReversedTimestampAndValueSizes`? Would we re-use 
`unpackedReversedTimestampAndValueSizes` and put `valueSize` as optional and 
populate later only on-demand?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to