showuon commented on a change in pull request #11211: URL: https://github.com/apache/kafka/pull/11211#discussion_r712843244
########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java ########## @@ -292,13 +408,46 @@ public V fetch(final K key, time); } + private long getActualWindowStartTime(final long timeFrom) { + return Math.max(timeFrom, ((PersistentWindowStore<Bytes, byte[]>) wrapped()).getObservedStreamTime() - retentionPeriod + 1); + } + + private KeyValueIterator<Windowed<K>, V> filterExpiredRecords(final boolean forward) { + final KeyValueIterator<Windowed<Bytes>, byte[]> allWindowedKeyValueIterator = forward ? wrapped().all() : wrapped().backwardAll(); + + final long observedStreamTime = ((PersistentWindowStore<Bytes, byte[]>) wrapped()).getObservedStreamTime(); + if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime == ConsumerRecord.NO_TIMESTAMP) + return new MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, streamsMetrics, serdes, time); + + final long windowStartBoundary = observedStreamTime - retentionPeriod + 1; + final List<KeyValue<Windowed<Bytes>, byte[]>> windowedKeyValuesInBoundary = new ArrayList<>(); + + while (allWindowedKeyValueIterator.hasNext()) { + final KeyValue<Windowed<Bytes>, byte[]> next = allWindowedKeyValueIterator.next(); + if (next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary))) { + continue; + } + windowedKeyValuesInBoundary.add(next); + } + return new MeteredWindowedKeyValueIterator<>(new WindowedKeyValueIterator(windowedKeyValuesInBoundary.iterator()), fetchSensor, streamsMetrics, serdes, time); + } Review comment: @vamossagar12 , sorry for late reply. I read through your changes again, and I'm thinking that if we could fetch a specific time range via `fetch` API, instead of `fetchAll`, then doing manually filtering? For example: ```java private KeyValueIterator<Windowed<K>, V> filterExpiredRecords(final boolean forward) { // this is what you've done, to fetch all, then filter later. // final KeyValueIterator<Windowed<Bytes>, byte[]> allWindowedKeyValueIterator = forward ? wrapped().all() : wrapped().backwardAll(); final long observedStreamTime = getObservedStreamTime(wrapped()); if (!allWindowedKeyValueIterator.hasNext() || observedStreamTime == ConsumerRecord.NO_TIMESTAMP) return new MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, streamsMetrics, serdes, time); final long windowStartBoundary = observedStreamTime - retentionPeriod + 1; // what about this? (use fetchAll(Instant timeFrom, Instant timeTo) to fetch the records in time range, by setting timeTo to the boundary we computed) final KeyValueIterator<Windowed<Bytes>, byte[]> allWindowedKeyValueIterator = forward ? wrapped().fetchAll(0, windowStartBoundary) : wrapped().backwardFetchAll(0, windowStartBoundary); return new MeteredWindowedKeyValueIterator<>(allWindowedKeyValueIterator, fetchSensor, streamsMetrics, serdes, time); /* final long windowStartBoundary = observedStreamTime - retentionPeriod + 1; final List<KeyValue<Windowed<Bytes>, byte[]>> expiredRecords = new ArrayList<>(); while (allWindowedKeyValueIterator.hasNext()) { final KeyValue<Windowed<Bytes>, byte[]> next = allWindowedKeyValueIterator.next(); if (next.key.window().endTime().isBefore(Instant.ofEpochMilli(windowStartBoundary))) { expiredRecords.add(next); } } for (KeyValue<Windowed<Bytes>, byte[]> record: expiredRecords) { wrapped().put(record.key.key(), null, record.key.window().start()); } */ //return new MeteredWindowedKeyValueIterator<>(wrapped().all(), fetchSensor, streamsMetrics, serdes, time); // return new MeteredWindowedKeyValueIterator<>(new WindowedKeyValueIterator(expiredRecords.iterator()), fetchSensor, streamsMetrics, serdes, time); } ``` Does that make sense? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org