mjsax commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1106482112


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -283,7 +293,12 @@ public void init(final StateStoreContext context, final 
StateStore root) {
 
     // VisibleForTesting
     void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> 
records) {
-        // advance stream time to the max timestamp in the batch
+        // copy the observed stream time, for use in deciding whether to drop 
records during restore,
+        // when records have exceeded the store's grace period.
+        long streamTimeForRestore = observedStreamTime;

Review Comment:
   Wondering if this would be correct?
   
   If we have `st = 100`, `grace=10` and we do `put(k,v,95)` the put is 
correct. If we restore at `st=110`, the would still need to keep `k,v` and not 
drop it, even if it's timestamp 95 is now "too old"?



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##########
@@ -523,6 +537,63 @@ public void shouldRestoreMultipleBatches() {
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15);
     }
 
+    @Test
+    public void shouldNotRestoreExpired() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+        records.add(new DataRecord("k1", "v1", HISTORY_RETENTION + 10 - 
GRACE_PERIOD)); // grace period has not elapsed
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION + 9 - 
GRACE_PERIOD)); // grace period has elapsed, so this record should not be 
restored
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+        verifyGetValueFromStore("k1", "v1", HISTORY_RETENTION + 10 - 
GRACE_PERIOD);
+        verifyGetNullFromStore("k2");
+    }
+
+    @Test
+    public void shouldRestoreEvenIfRecordWouldBeExpiredByEndOfBatch() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION - 
GRACE_PERIOD)); // this record will be older than grace period by the end of 
the batch, but should still be restored
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetValueFromStore("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD);
+        verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+    }
+
+    @Test
+    public void shouldAllowZeroHistoryRetention() {
+        // recreate store with zero history retention
+        store.close();
+        store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 0L, 
SEGMENT_INTERVAL);
+        store.init((StateStoreContext) context, store);
+
+        // put, get, and delete
+        putToStore("k", "v", BASE_TIMESTAMP);
+        verifyGetValueFromStore("k", "v", BASE_TIMESTAMP);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", 
BASE_TIMESTAMP);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v", 
BASE_TIMESTAMP); // query in "future" is allowed
+
+        // update existing record at same timestamp
+        putToStore("k", "updated", BASE_TIMESTAMP);
+        verifyGetValueFromStore("k", "updated", BASE_TIMESTAMP);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "updated", 
BASE_TIMESTAMP);
+
+        // put new record version
+        putToStore("k", "v2", BASE_TIMESTAMP + 2);
+        verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 2);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", 
BASE_TIMESTAMP + 2);
+
+        // query in past (history retention expired) returns null
+        verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 1);
+
+        // put in past (grace period expired) does not update the store

Review Comment:
   Should we also test put-in-past-for-existing record?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -297,6 +312,12 @@ void restoreBatch(final Collection<ConsumerRecord<byte[], 
byte[]>> records) {
         // records into memory. how high this memory amplification will be is 
very much dependent
         // on the specific workload and the value of the "segment interval" 
parameter.
         for (final ConsumerRecord<byte[], byte[]> record : records) {
+            if (record.timestamp() < streamTimeForRestore - gracePeriod) {
+                // record is older than grace period and was therefore never 
written to the store

Review Comment:
   If it was never written to the store, if should also not be in the changelog 
topic?
   
   This might still be useful if we read from the input topic for a KTable I 
guess? But we might want to update the JavaDoc for to mention this case?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -615,11 +631,10 @@ private <T extends VersionedStoreSegment> PutStatus 
maybePutToSegments(
                 }
 
                 if (foundMinTs < observedStreamTime - historyRetention) {
-                    // the record being inserted does not affect version 
history. discard and return
-                    if (expiredRecordSensor.isPresent()) {
-                        expiredRecordSensor.get().record(1.0d, 
context.currentSystemTimeMs());
-                        LOG.warn("Skipping record for expired put.");
-                    }
+                    // the record being inserted does not affect version 
history. discard and return.

Review Comment:
   Not sure if I can follow. Why did we record this in the sensor first, but 
not any longer?
   
   Same below (2x).



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##########
@@ -523,6 +537,63 @@ public void shouldRestoreMultipleBatches() {
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15);
     }
 
+    @Test
+    public void shouldNotRestoreExpired() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+        records.add(new DataRecord("k1", "v1", HISTORY_RETENTION + 10 - 
GRACE_PERIOD)); // grace period has not elapsed
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION + 9 - 
GRACE_PERIOD)); // grace period has elapsed, so this record should not be 
restored

Review Comment:
   Cf comment above. The question seems to be "when" the original `put()` 
happened with regard to stream-time?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to