aliehsaeedii commented on code in PR #14626:
URL: https://github.com/apache/kafka/pull/14626#discussion_r1384197764


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -253,6 +259,86 @@ public VersionedRecord<byte[]> get(final Bytes key, final 
long asOfTimestamp) {
         return null;
     }
 
+    // Visible for testing
+    ValueIterator<VersionedRecord<byte[]>> get(final Bytes key, final long 
fromTimestamp, final long toTimestamp, final boolean isAscending) {
+
+        Objects.requireNonNull(key, "key cannot be null");
+        validateStoreOpen();
+
+        final List<VersionedRecord<byte[]>> queryResults = new ArrayList<>();
+
+        if (toTimestamp < observedStreamTime - historyRetention) {
+            // history retention exceeded. we still check the latest value 
store in case the
+            // latest record version satisfies the timestamp bound, in which 
case it should
+            // still be returned (i.e., the latest record version per key 
never expires).
+            final byte[] rawLatestValueAndTimestamp = 
latestValueStore.get(key);
+            if (rawLatestValueAndTimestamp != null) {
+                final long latestTimestamp = 
LatestValueFormatter.getTimestamp(rawLatestValueAndTimestamp);
+                if (latestTimestamp <= toTimestamp) {
+                    // latest value satisfies timestamp bound
+                    queryResults.add(new VersionedRecord<>(
+                        
LatestValueFormatter.getValue(rawLatestValueAndTimestamp),
+                        latestTimestamp
+                    ));
+                }
+            }
+
+            // history retention has elapsed and the latest record version (if 
present) does
+            // not satisfy the timestamp bound. return null for 
predictability, even if data
+            // is still present in segments.
+            if (queryResults.size() == 0) {
+                LOG.warn("Returning null for expired get.");
+            }
+            if (!isAscending) {
+                queryResults.sort((r1, r2) -> (int) (r1.timestamp() - 
r2.timestamp()));
+            }
+            return new VersionedRecordIterator<>(queryResults);
+        } else {
+            // first check the latest value store
+            final byte[] rawLatestValueAndTimestamp = 
latestValueStore.get(key);
+            if (rawLatestValueAndTimestamp != null) {
+                final long latestTimestamp = LatestValueFormatter.getTimestamp(
+                    rawLatestValueAndTimestamp);
+                if (latestTimestamp <= toTimestamp) {
+                    queryResults.add(new VersionedRecord<>(
+                        
LatestValueFormatter.getValue(rawLatestValueAndTimestamp),
+                        latestTimestamp));
+                }
+            }
+
+            // check segment stores
+            final List<LogicalKeyValueSegment> segments = 
segmentStores.segments(Long.MIN_VALUE,
+                toTimestamp, false);
+            for (final LogicalKeyValueSegment segment : segments) {
+                final byte[] rawSegmentValue = segment.get(key);
+                if (rawSegmentValue != null) {
+
+                    if 
(RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue) < 
fromTimestamp) {
+                        // this segment contains no data for the queried time 
range, so earlier segments
+                        // cannot either
+                        break;
+                    }
+
+                    // the desired result is contained in this segment
+                    final List<SegmentSearchResult> searchResults = 
RocksDBVersionedStoreSegmentValueFormatter
+                                                                    
.deserialize(rawSegmentValue)
+                                                                    
.findAll(fromTimestamp, toTimestamp);
+                    for (final SegmentSearchResult searchResult : 
searchResults) {
+                        if (searchResult.value() != null && 
searchResult.validFrom() <= toTimestamp && searchResult.validTo() >= 
fromTimestamp) {

Review Comment:
   > When could `searchResult.value()` be `null`? Is is the tombstone case?
   
   Yes I think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to