vamossagar12 commented on a change in pull request #11211: URL: https://github.com/apache/kafka/pull/11211#discussion_r702306432
########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java ########## @@ -292,13 +408,46 @@ public V fetch(final K key, time); } + private long getActualWindowStartTime(final long timeFrom) { + return Math.max(timeFrom, ((PersistentWindowStore<Bytes, byte[]>) wrapped()).getObservedStreamTime() - retentionPeriod + 1); + } + + private KeyValueIterator<Windowed<K>, V> filterExpiredRecords(final boolean forward) { + final KeyValueIterator<Windowed<Bytes>, byte[]> allWindowedKeyValueIterator = forward ? wrapped().all() : wrapped().backwardAll(); + + final long observedStreamTime = ((PersistentWindowStore<Bytes, byte[]>) wrapped()).getObservedStreamTime(); + if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime == ConsumerRecord.NO_TIMESTAMP) + return new MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, streamsMetrics, serdes, time); + + final long windowStartBoundary = observedStreamTime - retentionPeriod + 1; + final List<KeyValue<Windowed<Bytes>, byte[]>> windowedKeyValuesInBoundary = new ArrayList<>(); + + while (allWindowedKeyValueIterator.hasNext()) { + final KeyValue<Windowed<Bytes>, byte[]> next = allWindowedKeyValueIterator.next(); + if (next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary))) { + continue; + } + windowedKeyValuesInBoundary.add(next); + } + return new MeteredWindowedKeyValueIterator<>(new WindowedKeyValueIterator(windowedKeyValuesInBoundary.iterator()), fetchSensor, streamsMetrics, serdes, time); + } Review comment: > @mjsax i have a question here... In the jira ticket, you. have mentioned that the best place for adding this filtering is in the MeteredStore as that implicitly adds the logic even for custom state stores. While for the most part, this kind of filtering has worked fine(fetching relevant records and then filtering in MeteredStore) but there's a case where it's failing. It's for test. cases like `shouldNotThrowConcurrentModificationException` . This seems to be because the put() call while iterating is appending to the wrapped instance of iterator and hence it's not visible. > > Looking at this, do you think it would be a good idea to move this logic in the actual RocksDB implementations? Or do you think there's a better way to do it here in MeteredStore class itself? @showuon / @ableegoldman would like to know your thoughts as well on this.. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org