showuon commented on a change in pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#discussion_r712843244



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##########
@@ -292,13 +408,46 @@ public V fetch(final K key,
             time);
     }
 
+    private long getActualWindowStartTime(final long timeFrom) {
+        return Math.max(timeFrom, ((PersistentWindowStore<Bytes, byte[]>) 
wrapped()).getObservedStreamTime() - retentionPeriod + 1);
+    }
+
+    private KeyValueIterator<Windowed<K>, V> filterExpiredRecords(final 
boolean forward) {
+        final KeyValueIterator<Windowed<Bytes>, byte[]> 
allWindowedKeyValueIterator = forward ? wrapped().all() : 
wrapped().backwardAll();
+
+        final long observedStreamTime = ((PersistentWindowStore<Bytes, 
byte[]>) wrapped()).getObservedStreamTime();
+        if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime == 
ConsumerRecord.NO_TIMESTAMP)
+            return new 
MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, 
streamsMetrics, serdes, time);
+
+        final long windowStartBoundary = observedStreamTime - retentionPeriod 
+ 1;
+        final List<KeyValue<Windowed<Bytes>, byte[]>> 
windowedKeyValuesInBoundary = new ArrayList<>();
+
+        while (allWindowedKeyValueIterator.hasNext()) {
+            final KeyValue<Windowed<Bytes>, byte[]> next = 
allWindowedKeyValueIterator.next();
+            if 
(next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary)))
 {
+                continue;
+            }
+            windowedKeyValuesInBoundary.add(next);
+        }
+        return new MeteredWindowedKeyValueIterator<>(new 
WindowedKeyValueIterator(windowedKeyValuesInBoundary.iterator()), fetchSensor, 
streamsMetrics, serdes, time);
+    }

Review comment:
       @vamossagar12 , sorry for late reply. I read through your changes again, 
and I'm thinking that if we could fetch a specific time range via `fetch` API, 
instead of `fetchAll`, then doing manually filtering?
   
   For example:
   ```java
   private KeyValueIterator<Windowed<K>, V> filterExpiredRecords(final boolean 
forward) {
           // this is what you've done, to fetch all, then filter later.
           // final KeyValueIterator<Windowed<Bytes>, byte[]> 
allWindowedKeyValueIterator = forward ? wrapped().all() : 
wrapped().backwardAll();
          
          final long observedStreamTime = getObservedStreamTime(wrapped());
          if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime == 
ConsumerRecord.NO_TIMESTAMP)
               return new 
MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, 
streamsMetrics, serdes, time);
   
   
           
           
        
   
          final long windowStartBoundary = observedStreamTime - retentionPeriod 
+ 1;
   
          // what about this? (use fetchAll(Instant timeFrom, Instant timeTo) 
to fetch the records in time range, by setting timeTo to the boundary we 
computed)
          final KeyValueIterator<Windowed<Bytes>, byte[]> 
allWindowedKeyValueIterator = forward ? wrapped().fetchAll(0, 
windowStartBoundary) : wrapped().backwardFetchAll(0, windowStartBoundary);
   
          return new 
MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, 
streamsMetrics, serdes, time);
   
           
   
   
           /*
           final long windowStartBoundary = observedStreamTime - 
retentionPeriod + 1;
           final List<KeyValue<Windowed<Bytes>, byte[]>> expiredRecords = new 
ArrayList<>();
   
           while (allWindowedKeyValueIterator.hasNext()) {
               final KeyValue<Windowed<Bytes>, byte[]> next = 
allWindowedKeyValueIterator.next();
               if 
(next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary)))
 {
                   expiredRecords.add(next);
               }
           }
           for (KeyValue<Windowed<Bytes>, byte[]> record: expiredRecords) {
               wrapped().put(record.key.key(), null, 
record.key.window().start());
           }
          */
   
           //return new MeteredWindowedKeyValueIterator<>(wrapped().all(), 
fetchSensor, streamsMetrics, serdes, time);
   //        return new MeteredWindowedKeyValueIterator<>(new 
WindowedKeyValueIterator(expiredRecords.iterator()), fetchSensor, 
streamsMetrics, serdes, time);
       }
   ```
   
   
   Does that make sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to