vamossagar12 commented on a change in pull request #11211:
URL: https://github.com/apache/kafka/pull/11211#discussion_r702306432



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
##########
@@ -292,13 +408,46 @@ public V fetch(final K key,
             time);
     }
 
+    private long getActualWindowStartTime(final long timeFrom) {
+        return Math.max(timeFrom, ((PersistentWindowStore<Bytes, byte[]>) 
wrapped()).getObservedStreamTime() - retentionPeriod + 1);
+    }
+
+    private KeyValueIterator<Windowed<K>, V> filterExpiredRecords(final 
boolean forward) {
+        final KeyValueIterator<Windowed<Bytes>, byte[]> 
allWindowedKeyValueIterator = forward ? wrapped().all() : 
wrapped().backwardAll();
+
+        final long observedStreamTime = ((PersistentWindowStore<Bytes, 
byte[]>) wrapped()).getObservedStreamTime();
+        if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime == 
ConsumerRecord.NO_TIMESTAMP)
+            return new 
MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, 
streamsMetrics, serdes, time);
+
+        final long windowStartBoundary = observedStreamTime - retentionPeriod 
+ 1;
+        final List<KeyValue<Windowed<Bytes>, byte[]>> 
windowedKeyValuesInBoundary = new ArrayList<>();
+
+        while (allWindowedKeyValueIterator.hasNext()) {
+            final KeyValue<Windowed<Bytes>, byte[]> next = 
allWindowedKeyValueIterator.next();
+            if 
(next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary)))
 {
+                continue;
+            }
+            windowedKeyValuesInBoundary.add(next);
+        }
+        return new MeteredWindowedKeyValueIterator<>(new 
WindowedKeyValueIterator(windowedKeyValuesInBoundary.iterator()), fetchSensor, 
streamsMetrics, serdes, time);
+    }

Review comment:
       > @mjsax i have a question here... In the jira ticket, you. have 
mentioned that the best place for adding this filtering is in the MeteredStore 
as that implicitly adds the logic even for custom state stores. While for the 
most part, this kind of filtering has worked fine(fetching relevant records and 
then filtering in MeteredStore) but there's a case where it's failing. It's for 
test. cases like `shouldNotThrowConcurrentModificationException` . This seems 
to be because the put() call while iterating is appending to the wrapped 
instance of iterator and hence it's not visible.
   > 
   > Looking at this, do you think it would be a good idea to move this logic 
in the actual RocksDB implementations? Or do you think there's a better way to 
do it here in MeteredStore class itself?
   
   @showuon / @ableegoldman would like to know your thoughts as well on this..




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to