vcrfxia commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1105189382


##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##########
@@ -523,6 +537,63 @@ public void shouldRestoreMultipleBatches() {
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15);
     }
 
+    @Test
+    public void shouldNotRestoreExpired() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+        records.add(new DataRecord("k1", "v1", HISTORY_RETENTION + 10 - 
GRACE_PERIOD)); // grace period has not elapsed
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION + 9 - 
GRACE_PERIOD)); // grace period has elapsed, so this record should not be 
restored
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+        verifyGetValueFromStore("k1", "v1", HISTORY_RETENTION + 10 - 
GRACE_PERIOD);
+        verifyGetNullFromStore("k2");
+    }
+
+    @Test
+    public void shouldRestoreEvenIfRecordWouldBeExpiredByEndOfBatch() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION - 
GRACE_PERIOD)); // this record will be older than grace period by the end of 
the batch, but should still be restored
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetValueFromStore("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD);
+        verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+    }
+
+    @Test
+    public void shouldAllowZeroHistoryRetention() {

Review Comment:
   Added this extra test in response to a previous PR review comment. 
   
   This is an interesting edge case in that if history retention = grace period 
= 0, then we don't actually need the segments store because grace period = 0 
means we don't need to store tombstones. (Even if a tombstone is the latest 
value for a given key, the store will never accept earlier writes so the store 
doesn't need to keep the tombstone after clearing the current value for the 
key.)
   
   Is it worth it to add extra code to remove the segments store in this case? 
My instinct says no because this case does not seem very practical. For a user 
to use grace period = 0 requires that they are confident that all records 
within a partition, even across keys, are produced in ascending (technically, 
non-decreasing) timestamp order. I'm not sure how many use cases meet this 
criterion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to