aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1420452896


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -100,34 +134,49 @@ private boolean maybeFillIterator() {
             final byte[] rawSegmentValue = segment.get(key, snapshot);
             if (rawSegmentValue != null) { // this segment contains record(s) 
with the specified key
                 if (segment.id() == -1) { // this is the latestValueStore
-                    final long recordTimestamp = 
RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue);
-                    if (recordTimestamp <= toTime) {
-                        // latest value satisfies timestamp bound
-                        queryResults.add(new 
VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue),
 recordTimestamp));
-                    }
+                    this.currentRawSegmentValue = rawSegmentValue;
                 } else {
-                    // this segment contains records with the specified key 
and time range
-                    final 
List<RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult>
 searchResults =
-                            
RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue).findAll(fromTime,
 toTime);
-                    for (final 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
searchResult : searchResults) {
-                        queryResults.add(new 
VersionedRecord<>(searchResult.value(), searchResult.validFrom(), 
searchResult.validTo()));
-                    }
+                    this.currentDeserializedSegmentValue = 
RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue);
+                    this.minTimestamp = 
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(rawSegmentValue);
+                    this.nextTimestamp = 
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue);
                 }
-            }
-            if (!queryResults.isEmpty()) {
-                break;
+                return true;
             }
         }
-        if (!queryResults.isEmpty()) {
-            // since data is stored in descending order in the segments, 
create the list in reverse order, if the order is Ascending.
-            this.iterator = order.equals(ResultOrder.ASCENDING) ? 
queryResults.listIterator(queryResults.size()) : queryResults.listIterator();
-            return true;
-        }
         // if all segments have been processed, release the snapshot
         releaseSnapshot();
         return false;
     }
 
+    private Object getNextRecord() {
+        VersionedRecord nextRecord = null;
+        if (currentRawSegmentValue != null) { // this is the latestValueStore
+            final long recordTimestamp = 
RocksDBVersionedStore.LatestValueFormatter.getTimestamp(currentRawSegmentValue);
+            if (recordTimestamp <= toTime) {
+                final byte[] value = 
RocksDBVersionedStore.LatestValueFormatter.getValue(currentRawSegmentValue);
+                // latest value satisfies timestamp bound
+                nextRecord = new VersionedRecord<>(value, recordTimestamp);
+            }
+        } else {
+            final 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
currentRecord =
+                    currentDeserializedSegmentValue.find(fromTime, toTime, 
order);
+            if (currentRecord != null) {
+                nextRecord = new VersionedRecord<>(currentRecord.value(), 
currentRecord.validFrom(), currentRecord.validTo());
+            }
+        }
+        // no relevant record can be found in the segment
+        if (currentRawSegmentValue != null || nextRecord == null || 
!canSegmentHaveMoreRelevantRecords(nextRecord.timestamp(), 
Long.parseLong(nextRecord.validTo().get().toString()))) {

Review Comment:
   > `Long.parseLong(nextRecord.validTo().get().toString())` -- why to we get 
valid-to as `String` and pipe though `parseLong`? Why can't we just pass 
`nextRecord.validTo().get()` into `canSegmentHaveMoreRelevantRecords(...)`?
   
   I had to do that. Fixing generic type, fixed all these issues now :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to