aliehsaeedii commented on code in PR #14957: URL: https://github.com/apache/kafka/pull/14957#discussion_r1420452896
########## streams/src/main/java/org/apache/kafka/streams/state/internals/LogicalSegmentIterator.java: ########## @@ -100,34 +134,49 @@ private boolean maybeFillIterator() { final byte[] rawSegmentValue = segment.get(key, snapshot); if (rawSegmentValue != null) { // this segment contains record(s) with the specified key if (segment.id() == -1) { // this is the latestValueStore - final long recordTimestamp = RocksDBVersionedStore.LatestValueFormatter.getTimestamp(rawSegmentValue); - if (recordTimestamp <= toTime) { - // latest value satisfies timestamp bound - queryResults.add(new VersionedRecord<>(RocksDBVersionedStore.LatestValueFormatter.getValue(rawSegmentValue), recordTimestamp)); - } + this.currentRawSegmentValue = rawSegmentValue; } else { - // this segment contains records with the specified key and time range - final List<RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult> searchResults = - RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue).findAll(fromTime, toTime); - for (final RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult searchResult : searchResults) { - queryResults.add(new VersionedRecord<>(searchResult.value(), searchResult.validFrom(), searchResult.validTo())); - } + this.currentDeserializedSegmentValue = RocksDBVersionedStoreSegmentValueFormatter.deserialize(rawSegmentValue); + this.minTimestamp = RocksDBVersionedStoreSegmentValueFormatter.getMinTimestamp(rawSegmentValue); + this.nextTimestamp = RocksDBVersionedStoreSegmentValueFormatter.getNextTimestamp(rawSegmentValue); } - } - if (!queryResults.isEmpty()) { - break; + return true; } } - if (!queryResults.isEmpty()) { - // since data is stored in descending order in the segments, create the list in reverse order, if the order is Ascending. - this.iterator = order.equals(ResultOrder.ASCENDING) ? queryResults.listIterator(queryResults.size()) : queryResults.listIterator(); - return true; - } // if all segments have been processed, release the snapshot releaseSnapshot(); return false; } + private Object getNextRecord() { + VersionedRecord nextRecord = null; + if (currentRawSegmentValue != null) { // this is the latestValueStore + final long recordTimestamp = RocksDBVersionedStore.LatestValueFormatter.getTimestamp(currentRawSegmentValue); + if (recordTimestamp <= toTime) { + final byte[] value = RocksDBVersionedStore.LatestValueFormatter.getValue(currentRawSegmentValue); + // latest value satisfies timestamp bound + nextRecord = new VersionedRecord<>(value, recordTimestamp); + } + } else { + final RocksDBVersionedStoreSegmentValueFormatter.SegmentValue.SegmentSearchResult currentRecord = + currentDeserializedSegmentValue.find(fromTime, toTime, order); + if (currentRecord != null) { + nextRecord = new VersionedRecord<>(currentRecord.value(), currentRecord.validFrom(), currentRecord.validTo()); + } + } + // no relevant record can be found in the segment + if (currentRawSegmentValue != null || nextRecord == null || !canSegmentHaveMoreRelevantRecords(nextRecord.timestamp(), Long.parseLong(nextRecord.validTo().get().toString()))) { Review Comment: > `Long.parseLong(nextRecord.validTo().get().toString())` -- why to we get valid-to as `String` and pipe though `parseLong`? Why can't we just pass `nextRecord.validTo().get()` into `canSegmentHaveMoreRelevantRecords(...)`? I had to do that. Fixing generic type, fixed all these issues now :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org