vamossagar12 commented on a change in pull request #11211: URL: https://github.com/apache/kafka/pull/11211#discussion_r713255569
########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java ########## @@ -292,13 +408,46 @@ public V fetch(final K key, time); } + private long getActualWindowStartTime(final long timeFrom) { + return Math.max(timeFrom, ((PersistentWindowStore<Bytes, byte[]>) wrapped()).getObservedStreamTime() - retentionPeriod + 1); + } + + private KeyValueIterator<Windowed<K>, V> filterExpiredRecords(final boolean forward) { + final KeyValueIterator<Windowed<Bytes>, byte[]> allWindowedKeyValueIterator = forward ? wrapped().all() : wrapped().backwardAll(); + + final long observedStreamTime = ((PersistentWindowStore<Bytes, byte[]>) wrapped()).getObservedStreamTime(); + if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime == ConsumerRecord.NO_TIMESTAMP) + return new MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, streamsMetrics, serdes, time); + + final long windowStartBoundary = observedStreamTime - retentionPeriod + 1; + final List<KeyValue<Windowed<Bytes>, byte[]>> windowedKeyValuesInBoundary = new ArrayList<>(); + + while (allWindowedKeyValueIterator.hasNext()) { + final KeyValue<Windowed<Bytes>, byte[]> next = allWindowedKeyValueIterator.next(); + if (next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary))) { + continue; + } + windowedKeyValuesInBoundary.add(next); + } + return new MeteredWindowedKeyValueIterator<>(new WindowedKeyValueIterator(windowedKeyValuesInBoundary.iterator()), fetchSensor, streamsMetrics, serdes, time); + } Review comment: @showuon that's an interesting idea. But, I think it needs to be in the opposite order ie, we need to fetch from windowStartBoundary -> end. We don't know the end timestamp in this case. (The above code is wrong as I checked in during some testing). So, what I mean to say is, this: `final KeyValueIterator<Windowed<Bytes>, byte[]> allWindowedKeyValueIterator = forward ? wrapped().fetchAll(0, windowStartBoundary) : wrapped().backwardFetchAll(0, windowStartBoundary);` should be `final KeyValueIterator<Windowed<Bytes>, byte[]> allWindowedKeyValueIterator = forward ? wrapped().fetchAll(windowStartBoundary, end) : wrapped().backwardFetchAll(windowStartBoundary, end);` But, we don't know the end. I ran it with Long.MAX_VALUE and the test case in question seems to have passed. Thanks for that! I will clean up this code again and send for review again. Thanks again! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org