vamossagar12 commented on code in PR #11211:
URL: https://github.com/apache/kafka/pull/11211#discussion_r855009539


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java:
##########
@@ -181,15 +189,16 @@ public void testRolling() {
             ),
             segmentDirs(baseDir)
         );
-
+        // expired record
         assertEquals(
-            new HashSet<>(Collections.singletonList("zero")),
+            new HashSet<>(Collections.emptyList()),

Review Comment:
   Sure @showuon .. Let's consider the test case => 
`shouldOnlyIterateOpenSegments` Now when I run it by setting a very high 
retention period (like 5 * SEGMENT_INTERVAL ), the test case fails with this 
assertion => 
   
   ```
   expected:<KeyValue(60000, two)> but was:<KeyValue(0, one)>
   Expected :KeyValue(60000, two)
   Actual   :KeyValue(0, one)
   ```
   
   So, the testcase expects that after the 4th put 
[here](https://github.com/apache/kafka/pull/11211/files/af361d42295b1f116ecbed6b5721f66b093ba98b#diff-ba861b0f1a8d8f4ae8f2b8464e168c29fc5470cad94ffda1c8fa0b3c7c15b72bR135),
 it should expire the older segment and return only 2/3 from the fetch. I 
looked at the values further and retention is 300k and segmentInterval is 150k 
for all stores. Now, the 4th put does lead to the creation of a new segment (as 
it's 180k/150k) but the older segment doesn't still get expired as the 
observedStreamTime (180k) < retentionPeriod(300k) which means it returns the 
first put as well. 
   
   I think this is how the store iterators work currently and the changes in 
this PR are not impacting this behaviour. With my change, for 
TimeOrderedWindowStores, this comment should explain why I needed to change the 
test case:
   
   ```
   // In case of RocksDBTimeOrderedWindowStoreWithIndex, there's an extra get 
call to check if a key exists in base store or not
                   // now, that record is expired so as it's ts is 60,000 which 
is < (observedTime - retention + 1) == 60,001
                   // and hence that record is also not returned.
   ```
   
   I think to have the test cases work as is with a high retention, I will need 
to change/tweak the segment_interval value to have the retentionPeriod kicking 
in. Currently, the effective segmentInterval set at a store level is defined by 
=>
   
   `Math.max(retentionPeriod/2, segmentInterval)` and I think that's the reason 
in the test case initialization, retention is set to 2 * SEGMENT_INTERVAL which 
neatly fits into the test case. That's because at observedSteamTime = 180k, 
retention is 120k segment 3 gets rolled out and all segments prior to TS 60k 
(180k - 120k) gets dropped which passes the test case. 
   
   Other test cases would also follow suit and of-course the expiry of records 
from this PR would also kick in. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to