vamossagar12 commented on code in PR #11211: URL: https://github.com/apache/kafka/pull/11211#discussion_r855009539
########## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java: ########## @@ -181,15 +189,16 @@ public void testRolling() { ), segmentDirs(baseDir) ); - + // expired record assertEquals( - new HashSet<>(Collections.singletonList("zero")), + new HashSet<>(Collections.emptyList()), Review Comment: Sure @showuon .. Let's consider the test case => `shouldOnlyIterateOpenSegments` Now when I run it by setting a very high retention period (like 5 * SEGMENT_INTERVAL ), the test case fails with this assertion => ``` expected:<KeyValue(60000, two)> but was:<KeyValue(0, one)> Expected :KeyValue(60000, two) Actual :KeyValue(0, one) ``` So, the testcase expects that after the 4th put [here](https://github.com/apache/kafka/pull/11211/files/af361d42295b1f116ecbed6b5721f66b093ba98b#diff-ba861b0f1a8d8f4ae8f2b8464e168c29fc5470cad94ffda1c8fa0b3c7c15b72bR135), it should expire the older segment and return only 2/3 from the fetch. I looked at the values further and retention is 300k and segmentInterval is 150k for all stores. Now, the 4th put does lead to the creation of a new segment (as it's 180k/150k) but the older segment doesn't still get expired as the observedStreamTime (180k) < retentionPeriod(300k) which means it returns the first put as well. I think this is how the store iterators work currently and the changes in this PR are not impacting this behaviour. With my change, for TimeOrderedWindowStores, this comment should explain why I needed to change the test case: ``` // In case of RocksDBTimeOrderedWindowStoreWithIndex, there's an extra get call to check if a key exists in base store or not // now, that record is expired so as it's ts is 60,000 which is < (observedTime - retention + 1) == 60,001 // and hence that record is also not returned. ``` I think to have the test cases work as is with a high retention, I will need to change/tweak the segment_interval value to have the retentionPeriod kicking in. Currently, the effective segmentInterval set at a store level is defined by => `Math.max(retentionPeriod/2, segmentInterval)` and I think that's the reason in the test case initialization, retention is set to 2 * SEGMENT_INTERVAL which neatly fits into the test case. That's because at observedSteamTime = 180k, retention is 120k segment 3 gets rolled out and all segments prior to TS 60k (180k - 120k) gets dropped which passes the test case. Other test cases would also follow suit and of-course the expiry of records from this PR would also kick in. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org