vcrfxia commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1106532591


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -297,6 +312,12 @@ void restoreBatch(final Collection<ConsumerRecord<byte[], 
byte[]>> records) {
         // records into memory. how high this memory amplification will be is 
very much dependent
         // on the specific workload and the value of the "segment interval" 
parameter.
         for (final ConsumerRecord<byte[], byte[]> record : records) {
+            if (record.timestamp() < streamTimeForRestore - gracePeriod) {
+                // record is older than grace period and was therefore never 
written to the store

Review Comment:
   > If it was never written to the store, if should also not be in the 
changelog topic?
   
   Ideally, but unfortunately no. Only the inner layer (RocksDBVersionedStore) 
contains logic for deciding when grace period has elapsed and a call to `put()` 
should return without updating the store. The changelogging layer wrapped 
around this inner layer does not know about grace period, nor do any of the 
other outer layers. The changelogging layer does call `put()` before calling 
`log()`, but because `put()` has no return type, it does not convey information 
about whether an update was actually made or if `put()` simply returned without 
doing anything. So, the changelogging layer calls `log()` in either case.
   
   This is the existing behavior for window stores, and what I had planned to 
replicate for versioned stores as well. If we don't want this, we could:
   * update `put()` to return a boolean, indicating whether the update was 
actually performed, or
   * track observed stream time and grace period at an outer store layer, in 
order to not call `log()` at the changelogging layer if it's not needed.
   
   I don't particularly like either option. Curious to hear your thoughts.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -615,11 +631,10 @@ private <T extends VersionedStoreSegment> PutStatus 
maybePutToSegments(
                 }
 
                 if (foundMinTs < observedStreamTime - historyRetention) {
-                    // the record being inserted does not affect version 
history. discard and return
-                    if (expiredRecordSensor.isPresent()) {
-                        expiredRecordSensor.get().record(1.0d, 
context.currentSystemTimeMs());
-                        LOG.warn("Skipping record for expired put.");
-                    }
+                    // the record being inserted does not affect version 
history. discard and return.

Review Comment:
   With the changes in this PR, it is only possible to hit this case during 
restore now. Previously, we passed `Optional.empty()` for the 
expiredRecordSensor anyway, because we don't want to call the sensor during 
restore. So I've simplified the code by removing it entirely.
   
   The reason it is not possible to hit this case during non-restore is because 
`doPut()` is not called if the record being put is older than grace period, and 
history retention is always at least as large as grace period. (See my comment 
above for why it is still possible to hit this case during restore.)



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##########
@@ -523,6 +537,63 @@ public void shouldRestoreMultipleBatches() {
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15);
     }
 
+    @Test
+    public void shouldNotRestoreExpired() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+        records.add(new DataRecord("k1", "v1", HISTORY_RETENTION + 10 - 
GRACE_PERIOD)); // grace period has not elapsed
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION + 9 - 
GRACE_PERIOD)); // grace period has elapsed, so this record should not be 
restored
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+        verifyGetValueFromStore("k1", "v1", HISTORY_RETENTION + 10 - 
GRACE_PERIOD);
+        verifyGetNullFromStore("k2");
+    }
+
+    @Test
+    public void shouldRestoreEvenIfRecordWouldBeExpiredByEndOfBatch() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION - 
GRACE_PERIOD)); // this record will be older than grace period by the end of 
the batch, but should still be restored
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+
+        store.restoreBatch(getChangelogRecords(records));
+
+        verifyGetValueFromStore("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD);
+        verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10);
+    }
+
+    @Test
+    public void shouldAllowZeroHistoryRetention() {
+        // recreate store with zero history retention
+        store.close();
+        store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 0L, 
SEGMENT_INTERVAL);
+        store.init((StateStoreContext) context, store);
+
+        // put, get, and delete
+        putToStore("k", "v", BASE_TIMESTAMP);
+        verifyGetValueFromStore("k", "v", BASE_TIMESTAMP);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", 
BASE_TIMESTAMP);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v", 
BASE_TIMESTAMP); // query in "future" is allowed
+
+        // update existing record at same timestamp
+        putToStore("k", "updated", BASE_TIMESTAMP);
+        verifyGetValueFromStore("k", "updated", BASE_TIMESTAMP);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "updated", 
BASE_TIMESTAMP);
+
+        // put new record version
+        putToStore("k", "v2", BASE_TIMESTAMP + 2);
+        verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 2);
+        verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", 
BASE_TIMESTAMP + 2);
+
+        // query in past (history retention expired) returns null
+        verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 1);
+
+        // put in past (grace period expired) does not update the store

Review Comment:
   Sure, I can add that. I was worried that the test case was already getting a 
bit long :)



##########
streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java:
##########
@@ -523,6 +537,63 @@ public void shouldRestoreMultipleBatches() {
         verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15);
     }
 
+    @Test
+    public void shouldNotRestoreExpired() {
+        final List<DataRecord> records = new ArrayList<>();
+        records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10));
+        records.add(new DataRecord("k1", "v1", HISTORY_RETENTION + 10 - 
GRACE_PERIOD)); // grace period has not elapsed
+        records.add(new DataRecord("k2", "v2", HISTORY_RETENTION + 9 - 
GRACE_PERIOD)); // grace period has elapsed, so this record should not be 
restored

Review Comment:
   That's correct. This test case uses the same data as `shouldNotPutExpired()` 
above. This third record is expired even during normal put operations.



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -283,7 +293,12 @@ public void init(final StateStoreContext context, final 
StateStore root) {
 
     // VisibleForTesting
     void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> 
records) {
-        // advance stream time to the max timestamp in the batch
+        // copy the observed stream time, for use in deciding whether to drop 
records during restore,
+        // when records have exceeded the store's grace period.
+        long streamTimeForRestore = observedStreamTime;

Review Comment:
   Yeah this logic is pretty nuanced. (I tried to clarify in the comments but 
evidently not successfully.)
   
   The `doPut()` method is not responsible for deciding when a put is too old 
(according to grace period); that check happens before `doPut()` is called. 
Inside the `doPut()` method, however, `observedStreamTime` is still used to 
decide when old records have fallen out of history retention. If a record has 
fallen out of history retention, then we don't need to keep it in the store, 
and therefore `doPut()` returns.
   
   In this restore logic here, `streamTimeForRestore` is used to perform the 
grace period check. It would be incorrect to advance `streamTimeForRestore` at 
once for the entire batch, for the reason you gave above. In your example, we 
do still want to call `doPut()` for the record with `ts=95`. Assuming that is 
the first record in the restore batch, then `streamTimeForRestore=100` so 
`ts=95` and we call `doPut()` as we should. Only once we reach the later 
records in the restore batch will `streamTimeForRestore` be advanced past 100. 
   
   OTOH, `observedStreamTime` can be advanced to the end of the batch right 
away. This allows us to optimize situations where, for example, a record near 
the beginning of the restore batch which we would put into the store would be 
immediately expired (based on history retention) by the end of the restore 
batch, and therefore we can skip putting it in inside `doPut()`. Here's an 
example:
   * stream time is 50 at the start of the restore batch
   * segment interval is 25
   * stream time will be 100 by the end of the restore batch
   * restore batch contains a record `(k, v, 50)` and also `(k, v, 60)`.
   
   During restore when we see `(k, v, 50)`, we have to put it into the store 
(it's the latest value for the key so far). Then when we see `(k, v, 60)`, we 
also have to put it into the store (it's the new latest value) but we do NOT 
have to move `(k, v, 50)` into a segment store, because the segment that it 
would be moved into will be expired by the end of the restore process.
   
   Here's another example: exact same as above, but the restore batch contains 
`(k, v, 60)` before `(k, v, 50)`, instead of after. When we see `(k, v, 60)` we 
have to put it into the store. When we see `(k, v, 50)`, we still call 
`doPut()` because it's not expired based on grace period, but `doPut()` will 
see that it is expired based on history retention (using 
`observedStreamTime=100`, the value it will be by the end of the restore batch) 
and therefore `doPut()`returns without inserting into the store.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to