vamossagar12 commented on a change in pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#discussion_r713255569



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##########
@@ -292,13 +408,46 @@ public V fetch(final K key,
             time);
     }
 
+    private long getActualWindowStartTime(final long timeFrom) {
+        return Math.max(timeFrom, ((PersistentWindowStore<Bytes, byte[]>) 
wrapped()).getObservedStreamTime() - retentionPeriod + 1);
+    }
+
+    private KeyValueIterator<Windowed<K>, V> filterExpiredRecords(final 
boolean forward) {
+        final KeyValueIterator<Windowed<Bytes>, byte[]> 
allWindowedKeyValueIterator = forward ? wrapped().all() : 
wrapped().backwardAll();
+
+        final long observedStreamTime = ((PersistentWindowStore<Bytes, 
byte[]>) wrapped()).getObservedStreamTime();
+        if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime == 
ConsumerRecord.NO_TIMESTAMP)
+            return new 
MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, 
streamsMetrics, serdes, time);
+
+        final long windowStartBoundary = observedStreamTime - retentionPeriod 
+ 1;
+        final List<KeyValue<Windowed<Bytes>, byte[]>> 
windowedKeyValuesInBoundary = new ArrayList<>();
+
+        while (allWindowedKeyValueIterator.hasNext()) {
+            final KeyValue<Windowed<Bytes>, byte[]> next = 
allWindowedKeyValueIterator.next();
+            if 
(next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary)))
 {
+                continue;
+            }
+            windowedKeyValuesInBoundary.add(next);
+        }
+        return new MeteredWindowedKeyValueIterator<>(new 
WindowedKeyValueIterator(windowedKeyValuesInBoundary.iterator()), fetchSensor, 
streamsMetrics, serdes, time);
+    }

Review comment:
       @showuon that's an interesting idea. But, I think it needs to be in the 
opposite order ie, we need to fetch from windowStartBoundary -> end. We don't 
know the end timestamp in this case. (The above code is wrong as I checked in 
during some testing).
   
   So, what I mean to say is, 
   
   this:
   
   `final KeyValueIterator<Windowed<Bytes>, byte[]> allWindowedKeyValueIterator 
= forward ?
    wrapped().fetchAll(0, windowStartBoundary) : wrapped().backwardFetchAll(0, 
windowStartBoundary);`
   
   should be 
   
   `final KeyValueIterator<Windowed<Bytes>, byte[]> allWindowedKeyValueIterator 
= forward ?
    wrapped().fetchAll(windowStartBoundary, end) : 
wrapped().backwardFetchAll(windowStartBoundary, end);`
   
   
   But, we don't know the end. I ran it with Long.MAX_VALUE and the test case 
in question seems to have passed. Thanks for that!
   
   I will clean up this code again and send for review again. Thanks again!
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to