aliehsaeedii commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1420641426


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -340,32 +355,67 @@ public SegmentSearchResult find(final long timestamp, 
final boolean includeValue
         }
 
         @Override
-        public List<SegmentSearchResult> findAll(final long fromTime, final 
long toTime) {
-            long currNextTimestamp = nextTimestamp;
-            final List<SegmentSearchResult> segmentSearchResults = new 
ArrayList<>();
-            long currTimestamp = -1L; // choose an invalid timestamp. if this 
is valid, this needs to be re-worked
-            int currValueSize;
-            int currIndex = 0;
-            int cumValueSize = 0;
-            while (currTimestamp != minTimestamp) {
-                final int timestampSegmentIndex = 2 * TIMESTAMP_SIZE + 
currIndex * (TIMESTAMP_SIZE + VALUE_SIZE);
+        public SegmentSearchResult find(final long fromTime, final long 
toTime, final ResultOrder order) {
+            // this segment does not have any record in query specified time 
range
+            if (toTime < minTimestamp || fromTime > nextTimestamp) {
+                return null;
+            }
+            long currTimestamp = -1;
+            long currNextTimestamp = -1;
+
+
+            if (order.equals(ResultOrder.ASCENDING) && valuesStartingIndex == 
-1) {
+                findValuesStartingIndex();
+            }
+
+            while (hasStillRecord(currTimestamp, currNextTimestamp, order)) {
+                final int timestampSegmentIndex = getTimestampIndex(order, 
currentDeserIndex);
                 currTimestamp = 
ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex);
-                currValueSize = 
ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
-                cumValueSize += Math.max(currValueSize, 0);
+                currNextTimestamp = timestampSegmentIndex == 2 * 
TIMESTAMP_SIZE ? nextTimestamp // if this is the first record metadata 
(timestamp + value size)
+                                                                               
 : ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex - 
(TIMESTAMP_SIZE + VALUE_SIZE));
+                final int currValueSize = 
ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
                 if (currValueSize >= 0) {
                     final byte[] value = new byte[currValueSize];
-                    final int valueSegmentIndex = segmentValue.length - 
cumValueSize;
+                    final int valueSegmentIndex = getValueSegmentIndex(order, 
currentCumValueSize, currValueSize);
                     System.arraycopy(segmentValue, valueSegmentIndex, value, 
0, currValueSize);
                     if (currTimestamp <= toTime && currNextTimestamp > 
fromTime) {
-                        segmentSearchResults.add(new 
SegmentSearchResult(currIndex, currTimestamp, currNextTimestamp, value));
+                        currentCumValueSize += currValueSize;
+                        currentDeserIndex++;
+                        return new SegmentSearchResult(currentDeserIndex - 1, 
currTimestamp, currNextTimestamp, value);
                     }
                 }
-
                 // prep for next iteration
-                currNextTimestamp = currTimestamp;
+                currentCumValueSize += Math.max(currValueSize, 0);
+                currentDeserIndex++;
+            }
+            // search in segment expected to find result but did not
+            return null;
+        }
+
+        private boolean hasStillRecord(final long currTimestamp, final long 
currNextTimestamp, final ResultOrder order) {
+            return order.equals(ResultOrder.ASCENDING) ? currNextTimestamp != 
nextTimestamp : currTimestamp != minTimestamp;
+        }
+
+        private int getValueSegmentIndex(final ResultOrder order, final int 
currentCumValueSize, final int currValueSize) {
+            return order.equals(ResultOrder.ASCENDING) ? valuesStartingIndex + 
currentCumValueSize
+                                                       : segmentValue.length - 
(currentCumValueSize + currValueSize);
+        }
+
+        private int getTimestampIndex(final ResultOrder order, final int 
currIndex) {
+            return order.equals(ResultOrder.ASCENDING) ? valuesStartingIndex - 
((currIndex + 1) * (TIMESTAMP_SIZE + VALUE_SIZE))
+                                                       : 2 * TIMESTAMP_SIZE + 
currIndex * (TIMESTAMP_SIZE + VALUE_SIZE);
+        }
+
+        private void findValuesStartingIndex() {
+            long currTimestamp = -1;
+            int currIndex = 0;
+            int timestampSegmentIndex = 0;
+            while (currTimestamp != minTimestamp) {
+                timestampSegmentIndex = 2 * TIMESTAMP_SIZE + currIndex * 
(TIMESTAMP_SIZE + VALUE_SIZE);
+                currTimestamp = 
ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex);

Review Comment:
   > Given that we parse all timestamps, we should buffer them
   
   I do agree with you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to