mjsax commented on code in PR #14957:
URL: https://github.com/apache/kafka/pull/14957#discussion_r1423917600


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -100,34 +130,48 @@ private boolean maybeFillIterator() {
             final byte[] rawSegmentValue = segment.get(key, snapshot);
             if (rawSegmentValue != null) { // this segment contains record(s) 
with the specified key
                 if (segment.id() == -1) { // this is the latestValueStore
-                    final long recordTimestamp = 
RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue);
-                    if (recordTimestamp <= toTime) {
-                        // latest value satisfies timestamp bound
-                        queryResults.add(new 
VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue),
 recordTimestamp));
-                    }
+                    this.currentRawSegmentValue = rawSegmentValue;
                 } else {
-                    // this segment contains records with the specified key 
and time range
-                    final 
List<RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult>
 searchResults =
-                            
RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue).findAll(fromTime,
 toTime);
-                    for (final 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
searchResult : searchResults) {
-                        queryResults.add(new 
VersionedRecord<>(searchResult.value(), searchResult.validFrom(), 
searchResult.validTo()));
-                    }
+                    this.currentDeserializedSegmentValue = new 
ReadonlyPartiallyDeserializedSegmentValue(rawSegmentValue);
                 }
-            }
-            if (!queryResults.isEmpty()) {
-                break;
+                return true;
             }
         }
-        if (!queryResults.isEmpty()) {
-            // since data is stored in descending order in the segments, 
create the list in reverse order, if the order is Ascending.
-            this.iterator = order.equals(ResultOrder.ASCENDING) ? 
queryResults.listIterator(queryResults.size()) : queryResults.listIterator();
-            return true;
-        }
         // if all segments have been processed, release the snapshot
         releaseSnapshot();
         return false;
     }
 
+    private VersionedRecord<byte[]> getNextRecord() {
+        VersionedRecord<byte[]> nextRecord = null;
+        if (currentRawSegmentValue != null) { // this is the latestValueStore
+            final long recordTimestamp = 
RocksDBVersionedStore.LatestValueFormatter.getTimestamp(currentRawSegmentValue);
+            if (recordTimestamp <= toTime) {
+                final byte[] value = 
RocksDBVersionedStore.LatestValueFormatter.getValue(currentRawSegmentValue);
+                // latest value satisfies timestamp bound
+                nextRecord = new VersionedRecord<>(value, recordTimestamp);
+            }
+        } else {
+            final 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
currentRecord =
+                    currentDeserializedSegmentValue.find(fromTime, toTime, 
order, nextIndex);
+            if (currentRecord != null) {
+                nextRecord = new VersionedRecord<>(currentRecord.value(), 
currentRecord.validFrom(), currentRecord.validTo());
+                this.nextIndex = order.equals(ResultOrder.ASCENDING) ? 
currentRecord.index() - 1 : currentRecord.index() + 1;
+            }
+        }
+        // no relevant record can be found in the segment
+        if (currentRawSegmentValue != null || nextRecord == null || 
!canSegmentHaveMoreRelevantRecords(nextRecord.timestamp(), 
nextRecord.validTo().get())) {
+            prepareToFetchNextSegment();
+        }
+        return nextRecord;
+    }
+
+    private boolean canSegmentHaveMoreRelevantRecords(final long 
currentValidFrom, final long currentValidTo) {
+        final boolean isCurrentOutsideTimeRange = 
(order.equals(ResultOrder.ASCENDING) && (currentValidTo > toTime || 
currentDeserializedSegmentValue.nextTimestamp() == currentValidTo))

Review Comment:
   This is complex. Might be simpler to use:
   ```
   final boolean isCurrentOutsideTimeRange;
   if (order.equals(ResultOrder.ASCENDING) {
       isCurrentOutsideTimeRange = currentValidTo > toTime || 
currentDeserializedSegmentValue.nextTimestamp() == currentValidTo);
   } else {
       isCurrentOutsideTimeRange = currentValidFrom < fromTime || 
currentDeserializedSegmentValue.minTimestamp() == currentValidFrom);
   }



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.ResultOrder;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+
+final class ReadonlyPartiallyDeserializedSegmentValue {
+
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int VALUE_SIZE = 4;
+    private byte[] segmentValue;
+    private long nextTimestamp;
+    private long minTimestamp;
+
+    private int deserIndex = -1; // index up through which this segment has 
been deserialized (inclusive)
+
+    private Map<Integer, Integer> cumulativeValueSizes;
+
+    private int valuesStartingIndex = -1; // the index of the first value in 
the segment (but the last one in the list)
+    private Map<Integer, TimestampAndValueSize> unpackedTimestampAndValueSizes 
= new HashMap<>();
+    private int recordNumber = -1; // number of segment records
+
+
+    ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) {
+        this.segmentValue = segmentValue;
+        this.nextTimestamp =
+                
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue);
+        this.minTimestamp =
+                
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue);
+        resetDeserHelpers();
+    }
+
+
+    public long minTimestamp() {
+        return minTimestamp;
+    }
+
+    public long nextTimestamp() {
+        return nextTimestamp;
+    }
+
+    public byte[] serialize() {

Review Comment:
   Do we need this method? Cannot find where it's used (except test code)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.ResultOrder;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+
+final class ReadonlyPartiallyDeserializedSegmentValue {
+
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int VALUE_SIZE = 4;
+    private byte[] segmentValue;
+    private long nextTimestamp;
+    private long minTimestamp;
+
+    private int deserIndex = -1; // index up through which this segment has 
been deserialized (inclusive)
+
+    private Map<Integer, Integer> cumulativeValueSizes;
+
+    private int valuesStartingIndex = -1; // the index of the first value in 
the segment (but the last one in the list)
+    private Map<Integer, TimestampAndValueSize> unpackedTimestampAndValueSizes 
= new HashMap<>();
+    private int recordNumber = -1; // number of segment records
+
+
+    ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) {
+        this.segmentValue = segmentValue;
+        this.nextTimestamp =
+                
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue);
+        this.minTimestamp =
+                
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue);
+        resetDeserHelpers();
+    }
+
+
+    public long minTimestamp() {
+        return minTimestamp;
+    }
+
+    public long nextTimestamp() {
+        return nextTimestamp;
+    }
+
+    public byte[] serialize() {
+        return segmentValue;
+    }
+
+
+    public 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
find(
+            final long fromTime, final long toTime, final ResultOrder order, 
final int index) {
+
+        // this segment does not have any record in query specified time range
+        if (toTime < minTimestamp || fromTime > nextTimestamp) {
+            return null;
+        }
+
+        final boolean isAscending = order.equals(ResultOrder.ASCENDING);
+
+        if (isAscending && valuesStartingIndex == -1) {
+            findValuesStartingIndex();
+            deserIndex = recordNumber;
+        }
+
+        long currTimestamp = -1;
+        long currNextTimestamp = -1;
+        int currIndex = initializeCurrentIndex(index, isAscending);
+        int cumValueSize = initializeCumValueSize(index, currIndex, 
isAscending);
+        int currValueSize;
+
+
+        while (hasStillRecord(currTimestamp, currNextTimestamp, order)) {
+            if (hasBeenDeserialized(isAscending, currIndex)) {
+                final TimestampAndValueSize curr;
+                curr = unpackedTimestampAndValueSizes.get(currIndex);
+                currTimestamp = curr.timestamp;
+                cumValueSize = cumulativeValueSizes.get(currIndex);
+                currValueSize = curr.valueSize;
+
+                // update currValueSize
+                if (currValueSize == Integer.MIN_VALUE) {
+                    final int timestampSegmentIndex = getTimestampIndex(order, 
currIndex);
+                    currValueSize = 
ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
+                    unpackedTimestampAndValueSizes.put(currIndex, new 
TimestampAndValueSize(currTimestamp, cumValueSize));
+                }
+
+                currNextTimestamp = updateCurrNextTimestamp(currIndex, 
isAscending);
+
+            } else {
+                final int timestampSegmentIndex = getTimestampIndex(order, 
currIndex);
+                currTimestamp = 
ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex);
+                currValueSize = 
ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
+                currNextTimestamp = timestampSegmentIndex == 2 * TIMESTAMP_SIZE
+                        ? nextTimestamp // if this is the first record 
metadata (timestamp + value size)
+                        : 
ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex - (TIMESTAMP_SIZE + 
VALUE_SIZE));
+                cumValueSize += Math.max(currValueSize, 0);
+
+                // update deserHelpers
+                deserIndex = currIndex;
+                unpackedTimestampAndValueSizes.put(currIndex, new 
TimestampAndValueSize(currTimestamp, currValueSize));
+                cumulativeValueSizes.put(currIndex, cumValueSize);
+            }
+
+            if (currValueSize >= 0) {
+                final byte[] value = new byte[currValueSize];
+                final int valueSegmentIndex = getValueSegmentIndex(order, 
cumValueSize, currValueSize);
+                System.arraycopy(segmentValue, valueSegmentIndex, value, 0, 
currValueSize);

Review Comment:
   Can we move this line inside the `if` block (or maybe even all three 
lines?), to only do the array copy if we return a result? Otherwise, we might 
do this operation wastefully?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -150,7 +154,15 @@ interface SegmentValue {
          */
         SegmentSearchResult find(long timestamp, boolean includeValue);
 
-        List<SegmentSearchResult> findAll(long fromTime, long toTime);
+//        /**
+//         * Finds the next/previous record in this segment row that is valid 
within the query specified time range
+//         *
+//         * @param fromTime the query starting time point
+//         * @param toTime the query ending time point
+//         * @param order specifies the order (based on timestamp) in which 
records are returned
+//         * @return the record that is found, null if no record is found
+//         */
+//        SegmentSearchResult find(long fromTime, long toTime, ResultOrder 
order, int index);

Review Comment:
   can be removed?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -69,23 +73,49 @@ public boolean hasNext() {
         if (!open) {
             throw new IllegalStateException("The iterator is out of scope.");
         }
-        // since data is stored in descending order in the segments, check 
whether there is any previous record, if the order is Ascending.
-        final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? 
iterator.hasPrevious() : iterator.hasNext();
-        return hasStillLoad || maybeFillIterator();
+        if (this.next != null) {
+            return true;
+        }
+
+        while ((currentDeserializedSegmentValue != null || 
currentRawSegmentValue != null || segmentIterator.hasNext()) && this.next == 
null) {
+            boolean hasSegmentValue = currentDeserializedSegmentValue != null 
|| currentRawSegmentValue != null;
+            if (!hasSegmentValue) {
+                hasSegmentValue = maybeFillCurrentSegmentValue();
+            }
+            if (hasSegmentValue) {
+                this.next  = getNextRecord();
+                if (this.next == null) {
+                    prepareToFetchNextSegment();
+                }
+            }
+        }
+        return this.next != null;
     }
 
     @Override
-    public Object next() {
-        if (hasNext()) {
-            // since data is stored in descending order in the segments, 
retrieve previous record, if the order is Ascending.
-            return order.equals(ResultOrder.ASCENDING) ? iterator.previous() : 
iterator.next();
+    public VersionedRecord<byte[]> next() {
+        if (this.next == null) {
+            if (!hasNext()) {
+                throw new NoSuchElementException();
+            }
         }
-        throw new NoSuchElementException();
+        assert this.next != null;

Review Comment:
   We usually don't use `assert`. Can be removed.



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValueTest.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.ResultOrder;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+@RunWith(Enclosed.class)
+public class ReadonlyPartiallyDeserializedSegmentValueTest {
+
+    /**
+     * Non-exceptional scenarios which are expected to occur during regular 
store operation.
+     */
+    @RunWith(Parameterized.class)
+    public static class ExpectedCasesTest {
+
+        private static final List<TestCase> TEST_CASES = new ArrayList<>();
+
+        static {
+            // test cases are expected to have timestamps in strictly 
decreasing order (except for the degenerate case)
+            TEST_CASES.add(new TestCase("single record", 10, new 
TestRecord("foo".getBytes(), 1)));
+            TEST_CASES.add(new TestCase("multiple records", 10, new 
TestRecord("foo".getBytes(), 8), new TestRecord("bar".getBytes(), 3), new 
TestRecord("baz".getBytes(), 0)));
+            TEST_CASES.add(new TestCase("single tombstone", 10, new 
TestRecord(null, 1)));
+            TEST_CASES.add(new TestCase("multiple tombstone", 10, new 
TestRecord(null, 4), new TestRecord(null, 1)));
+            TEST_CASES.add(new TestCase("tombstones and records (r, t, r)", 
10, new TestRecord("foo".getBytes(), 5), new TestRecord(null, 2), new 
TestRecord("bar".getBytes(), 1)));
+            TEST_CASES.add(new TestCase("tombstones and records (t, r, t)", 
10, new TestRecord(null, 5), new TestRecord("foo".getBytes(), 2), new 
TestRecord(null, 1)));
+            TEST_CASES.add(new TestCase("tombstones and records (r, r, t, t)", 
10, new TestRecord("foo".getBytes(), 6), new TestRecord("bar".getBytes(), 5), 
new TestRecord(null, 2), new TestRecord(null, 1)));
+            TEST_CASES.add(new TestCase("tombstones and records (t, t, r, r)", 
10, new TestRecord(null, 7), new TestRecord(null, 6), new 
TestRecord("foo".getBytes(), 2), new TestRecord("bar".getBytes(), 1)));
+            TEST_CASES.add(new TestCase("record with empty bytes", 10, new 
TestRecord(new byte[0], 1)));
+            TEST_CASES.add(new TestCase("records with empty bytes (r, e)", 10, 
new TestRecord("foo".getBytes(), 4), new TestRecord(new byte[0], 1)));
+            TEST_CASES.add(new TestCase("records with empty bytes (e, e, r)", 
10, new TestRecord(new byte[0], 8), new TestRecord(new byte[0], 2), new 
TestRecord("foo".getBytes(), 1)));
+        }
+
+        private final TestCase testCase;
+
+        public ExpectedCasesTest(final TestCase testCase) {
+            this.testCase = testCase;
+        }
+
+        @Parameterized.Parameters(name = "{0}")
+        public static Collection<TestCase> data() {
+            return TEST_CASES;
+        }
+
+        @Test
+        public void shouldFindInTimeRangesWithDifferentOrders() {
+
+            // create a list of timestamps in ascending order to use them in 
combination for starting and ending point of the time range.
+            final List<Long> timestamps = 
createTimestampsFromTestRecords(testCase);
+
+            // verify results
+            final List<ResultOrder> orders = 
Arrays.asList(ResultOrder.ASCENDING, ResultOrder.ANY, ResultOrder.DESCENDING);
+            for (final ResultOrder order: orders) {
+                for (final Long from : timestamps) {
+                    for (final Long to : timestamps) {
+                        // build expected results indices based on time range
+                        final List<Integer> expectedRecordsIndices = new 
ArrayList<>();
+                        for (int i = 0; i < testCase.records.size(); i++) {
+                            final long recordValidTo = i == 0 ? 
testCase.nextTimestamp : testCase.records.get(i - 1).timestamp;
+                            if (testCase.records.get(i).timestamp <= to && 
recordValidTo > from) {
+                                if (testCase.records.get(i).value != null) { 
// the results do not include tombstones
+                                    expectedRecordsIndices.add(i);
+                                }
+                            }
+                        }
+                        // The only object that has access to the find method 
is the instance of LogicalSegmentIterator.
+                        // Therefore, closing the iterator means that the 
segment calling the find method is destroyed and needs
+                        // to be created for the next time range.
+                        final ReadonlyPartiallyDeserializedSegmentValue 
segmentValue = buildSegmentWithInsertLatest(testCase);
+                        if (order.equals(ResultOrder.ASCENDING)) {
+                            Collections.reverse(expectedRecordsIndices);
+                        }
+                        int segmentRecordIndex = -1;
+                        for (final int index : expectedRecordsIndices) {
+                            final long expectedValidTo = index == 0 ? 
testCase.nextTimestamp : testCase.records.get(index - 1).timestamp;
+
+                            final SegmentSearchResult result = 
segmentValue.find(from, to, order, segmentRecordIndex);
+                            assert result != null;
+
+                            final TestRecord expectedRecord = 
testCase.records.get(index);
+
+                            assertThat(result.index(), equalTo(index));
+                            assertThat(result.value(), 
equalTo(expectedRecord.value));
+                            assertThat(result.validFrom(), 
equalTo(expectedRecord.timestamp));
+                            assertThat(result.validTo(), 
equalTo(expectedValidTo));
+
+                            segmentRecordIndex = 
order.equals(ResultOrder.ASCENDING) ? index - 1 : index + 1;
+                        }
+                        // verify no results within the time range
+                        if (expectedRecordsIndices.size() == 0) {
+                            assertNull(segmentValue.find(from, to, order, -1));
+                        }
+                    }
+                }
+            }
+        }
+
+        @Test
+        public void shouldGetTimestamps() {
+            final byte[] segmentValue = 
buildSegmentWithInsertLatest(testCase).serialize();
+
+            
assertThat(RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue),
 equalTo(testCase.nextTimestamp));
+            
assertThat(RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue),
 equalTo(testCase.minTimestamp));
+        }
+    }
+
+    private static ReadonlyPartiallyDeserializedSegmentValue 
buildSegmentWithInsertLatest(final TestCase testCase) {
+        SegmentValue segmentValue = null;
+        for (int recordIdx = testCase.records.size() - 1; recordIdx >= 0; 
recordIdx--) {
+            final TestRecord record = testCase.records.get(recordIdx);
+            final long validTo = recordIdx == 0 ? testCase.nextTimestamp : 
testCase.records.get(recordIdx - 1).timestamp;
+
+            if (segmentValue == null) {
+                // initialize
+                if (testCase.records.size() > 1 && record.value == null) {
+                    // when possible, validate that building up a segment 
starting from the degenerate case is valid as well
+                    segmentValue = 
RocksDBVersionedStoreSegmentValueFormatter.newSegmentValueWithRecord(null, 
record.timestamp, record.timestamp);
+                } else {
+                    segmentValue = 
RocksDBVersionedStoreSegmentValueFormatter.newSegmentValueWithRecord(record.value,
 record.timestamp, validTo);
+                }
+            } else {
+                // insert latest
+                segmentValue.insertAsLatest(record.timestamp, validTo, 
record.value);
+            }
+        }
+        assert segmentValue != null;

Review Comment:
   `assertThat(..., notNull());` ? (Or could we even drop this line?)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.ResultOrder;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+
+final class ReadonlyPartiallyDeserializedSegmentValue {
+
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int VALUE_SIZE = 4;
+    private byte[] segmentValue;

Review Comment:
   Given it's read-only, should it be `final` (same for `nextTimestamp` and 
`minTimestmp`)?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -100,34 +130,48 @@ private boolean maybeFillIterator() {
             final byte[] rawSegmentValue = segment.get(key, snapshot);
             if (rawSegmentValue != null) { // this segment contains record(s) 
with the specified key
                 if (segment.id() == -1) { // this is the latestValueStore
-                    final long recordTimestamp = 
RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue);
-                    if (recordTimestamp <= toTime) {
-                        // latest value satisfies timestamp bound
-                        queryResults.add(new 
VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue),
 recordTimestamp));
-                    }
+                    this.currentRawSegmentValue = rawSegmentValue;
                 } else {
-                    // this segment contains records with the specified key 
and time range
-                    final 
List<RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult>
 searchResults =
-                            
RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue).findAll(fromTime,
 toTime);
-                    for (final 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
searchResult : searchResults) {
-                        queryResults.add(new 
VersionedRecord<>(searchResult.value(), searchResult.validFrom(), 
searchResult.validTo()));
-                    }
+                    this.currentDeserializedSegmentValue = new 
ReadonlyPartiallyDeserializedSegmentValue(rawSegmentValue);
                 }
-            }
-            if (!queryResults.isEmpty()) {
-                break;
+                return true;
             }
         }
-        if (!queryResults.isEmpty()) {
-            // since data is stored in descending order in the segments, 
create the list in reverse order, if the order is Ascending.
-            this.iterator = order.equals(ResultOrder.ASCENDING) ? 
queryResults.listIterator(queryResults.size()) : queryResults.listIterator();
-            return true;
-        }
         // if all segments have been processed, release the snapshot
         releaseSnapshot();
         return false;
     }
 
+    private VersionedRecord<byte[]> getNextRecord() {
+        VersionedRecord<byte[]> nextRecord = null;
+        if (currentRawSegmentValue != null) { // this is the latestValueStore
+            final long recordTimestamp = 
RocksDBVersionedStore.LatestValueFormatter.getTimestamp(currentRawSegmentValue);
+            if (recordTimestamp <= toTime) {
+                final byte[] value = 
RocksDBVersionedStore.LatestValueFormatter.getValue(currentRawSegmentValue);
+                // latest value satisfies timestamp bound
+                nextRecord = new VersionedRecord<>(value, recordTimestamp);
+            }
+        } else {
+            final 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
currentRecord =
+                    currentDeserializedSegmentValue.find(fromTime, toTime, 
order, nextIndex);
+            if (currentRecord != null) {
+                nextRecord = new VersionedRecord<>(currentRecord.value(), 
currentRecord.validFrom(), currentRecord.validTo());
+                this.nextIndex = order.equals(ResultOrder.ASCENDING) ? 
currentRecord.index() - 1 : currentRecord.index() + 1;
+            }
+        }
+        // no relevant record can be found in the segment
+        if (currentRawSegmentValue != null || nextRecord == null || 
!canSegmentHaveMoreRelevantRecords(nextRecord.timestamp(), 
nextRecord.validTo().get())) {

Review Comment:
   To make sure I understand this correctly:
   
   - If `nextRecord == null` it means we did not find anything and want to go 
to the next segment (make sense)
   - If `currentRawSegmentValue != null` it does not matter if we filled 
`nextRecord` or not, because the "latest" segment contains at max one record, 
and thus we need to step into the next segment in any case?
   - Similar for `canSegmentHaveMoreRelevantRecords` we check if we exceeded 
the current segment?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.ResultOrder;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+
+final class ReadonlyPartiallyDeserializedSegmentValue {
+
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int VALUE_SIZE = 4;
+    private byte[] segmentValue;
+    private long nextTimestamp;
+    private long minTimestamp;
+
+    private int deserIndex = -1; // index up through which this segment has 
been deserialized (inclusive)
+
+    private Map<Integer, Integer> cumulativeValueSizes;
+
+    private int valuesStartingIndex = -1; // the index of the first value in 
the segment (but the last one in the list)
+    private Map<Integer, TimestampAndValueSize> unpackedTimestampAndValueSizes 
= new HashMap<>();
+    private int recordNumber = -1; // number of segment records
+
+
+    ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) {
+        this.segmentValue = segmentValue;
+        this.nextTimestamp =
+                
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue);
+        this.minTimestamp =
+                
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue);
+        resetDeserHelpers();
+    }
+
+
+    public long minTimestamp() {
+        return minTimestamp;
+    }
+
+    public long nextTimestamp() {
+        return nextTimestamp;
+    }
+
+    public byte[] serialize() {
+        return segmentValue;
+    }
+
+
+    public 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
find(
+            final long fromTime, final long toTime, final ResultOrder order, 
final int index) {
+
+        // this segment does not have any record in query specified time range
+        if (toTime < minTimestamp || fromTime > nextTimestamp) {
+            return null;
+        }
+
+        final boolean isAscending = order.equals(ResultOrder.ASCENDING);
+
+        if (isAscending && valuesStartingIndex == -1) {
+            findValuesStartingIndex();
+            deserIndex = recordNumber;
+        }
+
+        long currTimestamp = -1;
+        long currNextTimestamp = -1;
+        int currIndex = initializeCurrentIndex(index, isAscending);
+        int cumValueSize = initializeCumValueSize(index, currIndex, 
isAscending);

Review Comment:
   Not sure why we need to init `cumValueSize`? It seem below we never read the 
value set here?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.ResultOrder;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+
+final class ReadonlyPartiallyDeserializedSegmentValue {
+
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int VALUE_SIZE = 4;
+    private byte[] segmentValue;
+    private long nextTimestamp;
+    private long minTimestamp;
+
+    private int deserIndex = -1; // index up through which this segment has 
been deserialized (inclusive)
+
+    private Map<Integer, Integer> cumulativeValueSizes;
+
+    private int valuesStartingIndex = -1; // the index of the first value in 
the segment (but the last one in the list)
+    private Map<Integer, TimestampAndValueSize> unpackedTimestampAndValueSizes 
= new HashMap<>();
+    private int recordNumber = -1; // number of segment records
+
+
+    ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) {
+        this.segmentValue = segmentValue;
+        this.nextTimestamp =
+                
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue);
+        this.minTimestamp =
+                
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue);
+        resetDeserHelpers();
+    }
+
+
+    public long minTimestamp() {
+        return minTimestamp;
+    }
+
+    public long nextTimestamp() {
+        return nextTimestamp;
+    }
+
+    public byte[] serialize() {
+        return segmentValue;
+    }
+
+
+    public 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
find(
+            final long fromTime, final long toTime, final ResultOrder order, 
final int index) {
+
+        // this segment does not have any record in query specified time range
+        if (toTime < minTimestamp || fromTime > nextTimestamp) {
+            return null;
+        }
+
+        final boolean isAscending = order.equals(ResultOrder.ASCENDING);
+
+        if (isAscending && valuesStartingIndex == -1) {
+            findValuesStartingIndex();
+            deserIndex = recordNumber;
+        }
+
+        long currTimestamp = -1;
+        long currNextTimestamp = -1;
+        int currIndex = initializeCurrentIndex(index, isAscending);
+        int cumValueSize = initializeCumValueSize(index, currIndex, 
isAscending);
+        int currValueSize;
+
+
+        while (hasStillRecord(currTimestamp, currNextTimestamp, order)) {
+            if (hasBeenDeserialized(isAscending, currIndex)) {
+                final TimestampAndValueSize curr;
+                curr = unpackedTimestampAndValueSizes.get(currIndex);
+                currTimestamp = curr.timestamp;
+                cumValueSize = cumulativeValueSizes.get(currIndex);
+                currValueSize = curr.valueSize;
+
+                // update currValueSize
+                if (currValueSize == Integer.MIN_VALUE) {
+                    final int timestampSegmentIndex = getTimestampIndex(order, 
currIndex);
+                    currValueSize = 
ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
+                    unpackedTimestampAndValueSizes.put(currIndex, new 
TimestampAndValueSize(currTimestamp, cumValueSize));

Review Comment:
   `cumValueSize` is not updated with `currValueSize`. Why?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java:
##########
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.ResultOrder;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+
+final class ReadonlyPartiallyDeserializedSegmentValue {
+
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int VALUE_SIZE = 4;
+    private byte[] segmentValue;
+    private long nextTimestamp;
+    private long minTimestamp;
+
+    private int deserIndex = -1; // index up through which this segment has 
been deserialized (inclusive)
+
+    private Map<Integer, Integer> cumulativeValueSizes;
+
+    private int valuesStartingIndex = -1; // the index of the first value in 
the segment (but the last one in the list)
+    private Map<Integer, TimestampAndValueSize> unpackedTimestampAndValueSizes 
= new HashMap<>();
+    private int recordNumber = -1; // number of segment records
+
+
+    ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) {
+        this.segmentValue = segmentValue;
+        this.nextTimestamp =
+                
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue);
+        this.minTimestamp =
+                
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue);
+        resetDeserHelpers();
+    }
+
+
+    public long minTimestamp() {
+        return minTimestamp;
+    }
+
+    public long nextTimestamp() {
+        return nextTimestamp;
+    }
+
+    public byte[] serialize() {
+        return segmentValue;
+    }
+
+
+    public 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
find(
+            final long fromTime, final long toTime, final ResultOrder order, 
final int index) {
+
+        // this segment does not have any record in query specified time range
+        if (toTime < minTimestamp || fromTime > nextTimestamp) {
+            return null;
+        }
+
+        final boolean isAscending = order.equals(ResultOrder.ASCENDING);
+
+        if (isAscending && valuesStartingIndex == -1) {
+            findValuesStartingIndex();
+            deserIndex = recordNumber;
+        }
+
+        long currTimestamp = -1;
+        long currNextTimestamp = -1;
+        int currIndex = initializeCurrentIndex(index, isAscending);
+        int cumValueSize = initializeCumValueSize(index, currIndex, 
isAscending);
+        int currValueSize;
+
+
+        while (hasStillRecord(currTimestamp, currNextTimestamp, order)) {
+            if (hasBeenDeserialized(isAscending, currIndex)) {
+                final TimestampAndValueSize curr;
+                curr = unpackedTimestampAndValueSizes.get(currIndex);
+                currTimestamp = curr.timestamp;
+                cumValueSize = cumulativeValueSizes.get(currIndex);

Review Comment:
   nit: flip order (first use `curr` to get out both field -- breaks the flow 
reading the code)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -100,34 +130,48 @@ private boolean maybeFillIterator() {
             final byte[] rawSegmentValue = segment.get(key, snapshot);
             if (rawSegmentValue != null) { // this segment contains record(s) 
with the specified key
                 if (segment.id() == -1) { // this is the latestValueStore
-                    final long recordTimestamp = 
RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue);
-                    if (recordTimestamp <= toTime) {
-                        // latest value satisfies timestamp bound
-                        queryResults.add(new 
VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue),
 recordTimestamp));
-                    }
+                    this.currentRawSegmentValue = rawSegmentValue;
                 } else {
-                    // this segment contains records with the specified key 
and time range
-                    final 
List<RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult>
 searchResults =
-                            
RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue).findAll(fromTime,
 toTime);
-                    for (final 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
searchResult : searchResults) {
-                        queryResults.add(new 
VersionedRecord<>(searchResult.value(), searchResult.validFrom(), 
searchResult.validTo()));
-                    }
+                    this.currentDeserializedSegmentValue = new 
ReadonlyPartiallyDeserializedSegmentValue(rawSegmentValue);
                 }
-            }
-            if (!queryResults.isEmpty()) {
-                break;
+                return true;
             }
         }
-        if (!queryResults.isEmpty()) {
-            // since data is stored in descending order in the segments, 
create the list in reverse order, if the order is Ascending.
-            this.iterator = order.equals(ResultOrder.ASCENDING) ? 
queryResults.listIterator(queryResults.size()) : queryResults.listIterator();
-            return true;
-        }
         // if all segments have been processed, release the snapshot
         releaseSnapshot();
         return false;
     }
 
+    private VersionedRecord<byte[]> getNextRecord() {
+        VersionedRecord<byte[]> nextRecord = null;
+        if (currentRawSegmentValue != null) { // this is the latestValueStore
+            final long recordTimestamp = 
RocksDBVersionedStore.LatestValueFormatter.getTimestamp(currentRawSegmentValue);
+            if (recordTimestamp <= toTime) {
+                final byte[] value = 
RocksDBVersionedStore.LatestValueFormatter.getValue(currentRawSegmentValue);
+                // latest value satisfies timestamp bound
+                nextRecord = new VersionedRecord<>(value, recordTimestamp);
+            }
+        } else {
+            final 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
currentRecord =
+                    currentDeserializedSegmentValue.find(fromTime, toTime, 
order, nextIndex);
+            if (currentRecord != null) {
+                nextRecord = new VersionedRecord<>(currentRecord.value(), 
currentRecord.validFrom(), currentRecord.validTo());
+                this.nextIndex = order.equals(ResultOrder.ASCENDING) ? 
currentRecord.index() - 1 : currentRecord.index() + 1;

Review Comment:
   We can add a new member `final int increment = 
order.equals(ResultOrder.ASCENDING) ? -1 : 1;` (we init it in the constructor) 
and simply call `this.nextIndex += increment` (this avoids the if-else in the 
hot path of the runtime code)
   
   Might need a comment why ascending is `-1` (what is unintuitive)



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java:
##########
@@ -69,23 +73,49 @@ public boolean hasNext() {
         if (!open) {
             throw new IllegalStateException("The iterator is out of scope.");
         }
-        // since data is stored in descending order in the segments, check 
whether there is any previous record, if the order is Ascending.
-        final boolean hasStillLoad = order.equals(ResultOrder.ASCENDING) ? 
iterator.hasPrevious() : iterator.hasNext();
-        return hasStillLoad || maybeFillIterator();
+        if (this.next != null) {
+            return true;
+        }
+
+        while ((currentDeserializedSegmentValue != null || 
currentRawSegmentValue != null || segmentIterator.hasNext()) && this.next == 
null) {
+            boolean hasSegmentValue = currentDeserializedSegmentValue != null 
|| currentRawSegmentValue != null;
+            if (!hasSegmentValue) {
+                hasSegmentValue = maybeFillCurrentSegmentValue();
+            }
+            if (hasSegmentValue) {
+                this.next  = getNextRecord();

Review Comment:
   nit: double space



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreSegmentValueFormatter.java:
##########
@@ -136,6 +137,9 @@ static SegmentValue newSegmentValueWithRecord(
     }
 
     interface SegmentValue {
+        long getMinTimestamp();
+
+        long getNextTimestamp();

Review Comment:
   Why do we need to add this here? The newly added 
`ReadOnlyPartiallyDeserializedSegementValue` does int implement his interface.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValue.java:
##########
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.ResultOrder;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+
+final class ReadonlyPartiallyDeserializedSegmentValue {
+
+    private static final int TIMESTAMP_SIZE = 8;
+    private static final int VALUE_SIZE = 4;
+    private byte[] segmentValue;
+    private long nextTimestamp;
+    private long minTimestamp;
+
+    private int deserIndex = -1; // index up through which this segment has 
been deserialized (inclusive)
+
+    private Map<Integer, Integer> cumulativeValueSizes;
+
+    private int valuesStartingIndex = -1; // the index of the first value in 
the segment (but the last one in the list)
+    private Map<Integer, TimestampAndValueSize> unpackedTimestampAndValueSizes 
= new HashMap<>();
+    private int recordNumber = -1; // number of segment records
+
+
+    ReadonlyPartiallyDeserializedSegmentValue(final byte[] segmentValue) {
+        this.segmentValue = segmentValue;
+        this.nextTimestamp =
+                
RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(segmentValue);
+        this.minTimestamp =
+                
RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(segmentValue);
+        resetDeserHelpers();
+    }
+
+
+    public long getMinTimestamp() {
+        return minTimestamp;
+    }
+
+    public long getNextTimestamp() {
+        return nextTimestamp;
+    }
+
+    public byte[] serialize() {
+        return segmentValue;
+    }
+
+
+    public 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult 
find(
+            final long fromTime, final long toTime, final ResultOrder order, 
final int index) {
+
+        // this segment does not have any record in query specified time range
+        if (toTime < minTimestamp || fromTime > nextTimestamp) {
+            return null;
+        }
+
+        final boolean isAscending = order.equals(ResultOrder.ASCENDING);
+
+        if (isAscending && valuesStartingIndex == -1) {
+            findValuesStartingIndex();
+            deserIndex = recordNumber;
+        }
+
+        long currTimestamp = -1;
+        long currNextTimestamp = -1;
+        int currIndex = initializeCurrentIndex(index, isAscending);
+        int cumValueSize = initializeCumvalueSize(index, currIndex, 
isAscending);
+        int currValueSize;
+
+
+        while (hasStillRecord(currTimestamp, currNextTimestamp, order)) {
+            if (hasBeenDeserialized(isAscending, currIndex)) {
+                final TimestampAndValueSize curr;
+                curr = unpackedTimestampAndValueSizes.get(currIndex);
+                currTimestamp = curr.timestamp;
+                cumValueSize = cumulativeValueSizes.get(currIndex);
+                currValueSize = curr.valueSize;
+
+                // update currValueSize
+                if (currValueSize == Integer.MIN_VALUE) {
+                    final int timestampSegmentIndex = getTimestampIndex(order, 
currIndex);
+                    currValueSize = 
ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
+                    unpackedTimestampAndValueSizes.put(currIndex, new 
TimestampAndValueSize(currTimestamp, cumValueSize));
+                }
+
+                currNextTimestamp = updateCurrNextTimestamp(currIndex, 
isAscending);
+
+            } else {
+                final int timestampSegmentIndex = getTimestampIndex(order, 
currIndex);
+                currTimestamp = 
ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex);
+                currValueSize = 
ByteBuffer.wrap(segmentValue).getInt(timestampSegmentIndex + TIMESTAMP_SIZE);
+                currNextTimestamp = timestampSegmentIndex == 2 * TIMESTAMP_SIZE
+                        ? nextTimestamp // if this is the first record 
metadata (timestamp + value size)
+                        : 
ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex - (TIMESTAMP_SIZE + 
VALUE_SIZE));
+                cumValueSize += Math.max(currValueSize, 0);
+
+                // update deserHelpers
+                deserIndex = currIndex;
+                unpackedTimestampAndValueSizes.put(currIndex, new 
TimestampAndValueSize(currTimestamp, currValueSize));
+                cumulativeValueSizes.put(currIndex, cumValueSize);
+            }
+
+            if (currValueSize >= 0) {
+                final byte[] value = new byte[currValueSize];
+                final int valueSegmentIndex = getValueSegmentIndex(order, 
cumValueSize, currValueSize);
+                System.arraycopy(segmentValue, valueSegmentIndex, value, 0, 
currValueSize);
+                if (currTimestamp <= toTime && currNextTimestamp > fromTime) {
+                    return new 
RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult(currIndex,
 currTimestamp, currNextTimestamp, value);
+                }
+            }
+            // prep for next iteration
+            currIndex = isAscending ? currIndex - 1 : currIndex + 1;
+        }
+        // search in segment expected to find result but did not
+        return null;
+    }
+
+    private long updateCurrNextTimestamp(final int currIndex, final boolean 
isAscending) {
+        if (isAscending) {
+            return currIndex == recordNumber - 1 ? nextTimestamp : 
unpackedTimestampAndValueSizes.get(currIndex + 1).timestamp;
+        } else {
+            return currIndex == 0 ? nextTimestamp : 
unpackedTimestampAndValueSizes.get(currIndex - 1).timestamp;
+        }
+    }
+
+    private int initializeCumvalueSize(final int index, final int currIndex, 
final boolean isAscending) {
+        return (index == Integer.MAX_VALUE || (!isAscending && index == 0)) ? 0
+                                                                            : 
isAscending ? cumulativeValueSizes.get(currIndex + 1)
+                                                                               
           : cumulativeValueSizes.get(currIndex - 1);
+    }
+
+    private int initializeCurrentIndex(final int index, final boolean 
isAscending) {
+        return isAscending && index == Integer.MAX_VALUE ? recordNumber - 1 : 
index;
+    }
+
+
+    private boolean hasStillRecord(final long currTimestamp, final long 
currNextTimestamp, final ResultOrder order) {
+        return order.equals(ResultOrder.ASCENDING) ? currNextTimestamp != 
nextTimestamp : currTimestamp != minTimestamp;
+    }
+
+    private boolean hasBeenDeserialized(final boolean isAscending, final int 
currIndex) {
+        if (!isAscending) {
+            return currIndex <= deserIndex;
+        }
+        return currIndex >= deserIndex;
+    }
+
+    private int getValueSegmentIndex(final ResultOrder order, final int 
currentCumValueSize, final int currValueSize) {
+        return order.equals(ResultOrder.ASCENDING) ? valuesStartingIndex + 
(currentCumValueSize - currValueSize)
+                                                   : segmentValue.length - 
currentCumValueSize;
+    }
+
+    private int getTimestampIndex(final ResultOrder order, final int 
currIndex) {
+        return order.equals(ResultOrder.ASCENDING) ? valuesStartingIndex - 
((recordNumber - currIndex) * (TIMESTAMP_SIZE + VALUE_SIZE))
+                                                   : 2 * TIMESTAMP_SIZE + 
currIndex * (TIMESTAMP_SIZE + VALUE_SIZE);
+    }
+
+    private void findValuesStartingIndex() {
+        long currTimestamp = -1;
+        int currIndex = 0;
+        int timestampSegmentIndex = 0;
+        while (currTimestamp != minTimestamp) {
+            timestampSegmentIndex = 2 * TIMESTAMP_SIZE + currIndex * 
(TIMESTAMP_SIZE + VALUE_SIZE);
+            currTimestamp = 
ByteBuffer.wrap(segmentValue).getLong(timestampSegmentIndex);
+            unpackedTimestampAndValueSizes.put(currIndex, new 
TimestampAndValueSize(currTimestamp, Integer.MIN_VALUE));

Review Comment:
   I see... Not sure. Should be fine. Might be worth a comment?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/ReadonlyPartiallyDeserializedSegmentValueTest.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.streams.query.ResultOrder;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue;
+import 
org.apache.kafka.streams.state.internals.RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult;
+import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+@RunWith(Enclosed.class)
+public class ReadonlyPartiallyDeserializedSegmentValueTest {
+
+    /**
+     * Non-exceptional scenarios which are expected to occur during regular 
store operation.
+     */
+    @RunWith(Parameterized.class)
+    public static class ExpectedCasesTest {
+
+        private static final List<TestCase> TEST_CASES = new ArrayList<>();
+
+        static {
+            // test cases are expected to have timestamps in strictly 
decreasing order (except for the degenerate case)
+            TEST_CASES.add(new TestCase("single record", 10, new 
TestRecord("foo".getBytes(), 1)));
+            TEST_CASES.add(new TestCase("multiple records", 10, new 
TestRecord("foo".getBytes(), 8), new TestRecord("bar".getBytes(), 3), new 
TestRecord("baz".getBytes(), 0)));
+            TEST_CASES.add(new TestCase("single tombstone", 10, new 
TestRecord(null, 1)));
+            TEST_CASES.add(new TestCase("multiple tombstone", 10, new 
TestRecord(null, 4), new TestRecord(null, 1)));
+            TEST_CASES.add(new TestCase("tombstones and records (r, t, r)", 
10, new TestRecord("foo".getBytes(), 5), new TestRecord(null, 2), new 
TestRecord("bar".getBytes(), 1)));
+            TEST_CASES.add(new TestCase("tombstones and records (t, r, t)", 
10, new TestRecord(null, 5), new TestRecord("foo".getBytes(), 2), new 
TestRecord(null, 1)));
+            TEST_CASES.add(new TestCase("tombstones and records (r, r, t, t)", 
10, new TestRecord("foo".getBytes(), 6), new TestRecord("bar".getBytes(), 5), 
new TestRecord(null, 2), new TestRecord(null, 1)));
+            TEST_CASES.add(new TestCase("tombstones and records (t, t, r, r)", 
10, new TestRecord(null, 7), new TestRecord(null, 6), new 
TestRecord("foo".getBytes(), 2), new TestRecord("bar".getBytes(), 1)));
+            TEST_CASES.add(new TestCase("record with empty bytes", 10, new 
TestRecord(new byte[0], 1)));
+            TEST_CASES.add(new TestCase("records with empty bytes (r, e)", 10, 
new TestRecord("foo".getBytes(), 4), new TestRecord(new byte[0], 1)));
+            TEST_CASES.add(new TestCase("records with empty bytes (e, e, r)", 
10, new TestRecord(new byte[0], 8), new TestRecord(new byte[0], 2), new 
TestRecord("foo".getBytes(), 1)));
+        }
+
+        private final TestCase testCase;
+
+        public ExpectedCasesTest(final TestCase testCase) {
+            this.testCase = testCase;
+        }
+
+        @Parameterized.Parameters(name = "{0}")
+        public static Collection<TestCase> data() {
+            return TEST_CASES;
+        }
+
+        @Test
+        public void shouldFindInTimeRangesWithDifferentOrders() {
+
+            // create a list of timestamps in ascending order to use them in 
combination for starting and ending point of the time range.
+            final List<Long> timestamps = 
createTimestampsFromTestRecords(testCase);
+
+            // verify results
+            final List<ResultOrder> orders = 
Arrays.asList(ResultOrder.ASCENDING, ResultOrder.ANY, ResultOrder.DESCENDING);
+            for (final ResultOrder order: orders) {
+                for (final Long from : timestamps) {
+                    for (final Long to : timestamps) {
+                        // build expected results indices based on time range
+                        final List<Integer> expectedRecordsIndices = new 
ArrayList<>();
+                        for (int i = 0; i < testCase.records.size(); i++) {
+                            final long recordValidTo = i == 0 ? 
testCase.nextTimestamp : testCase.records.get(i - 1).timestamp;
+                            if (testCase.records.get(i).timestamp <= to && 
recordValidTo > from) {
+                                if (testCase.records.get(i).value != null) { 
// the results do not include tombstones
+                                    expectedRecordsIndices.add(i);
+                                }
+                            }
+                        }
+                        // The only object that has access to the find method 
is the instance of LogicalSegmentIterator.
+                        // Therefore, closing the iterator means that the 
segment calling the find method is destroyed and needs
+                        // to be created for the next time range.
+                        final ReadonlyPartiallyDeserializedSegmentValue 
segmentValue = buildSegmentWithInsertLatest(testCase);
+                        if (order.equals(ResultOrder.ASCENDING)) {
+                            Collections.reverse(expectedRecordsIndices);
+                        }
+                        int segmentRecordIndex = -1;
+                        for (final int index : expectedRecordsIndices) {
+                            final long expectedValidTo = index == 0 ? 
testCase.nextTimestamp : testCase.records.get(index - 1).timestamp;
+
+                            final SegmentSearchResult result = 
segmentValue.find(from, to, order, segmentRecordIndex);
+                            assert result != null;

Review Comment:
   Should this be `assertThat(result, notNull());`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to