vcrfxia commented on code in PR #13243: URL: https://github.com/apache/kafka/pull/13243#discussion_r1106532591
########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -297,6 +312,12 @@ void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) { // records into memory. how high this memory amplification will be is very much dependent // on the specific workload and the value of the "segment interval" parameter. for (final ConsumerRecord<byte[], byte[]> record : records) { + if (record.timestamp() < streamTimeForRestore - gracePeriod) { + // record is older than grace period and was therefore never written to the store Review Comment: > If it was never written to the store, if should also not be in the changelog topic? Ideally, but unfortunately no. Only the inner layer (RocksDBVersionedStore) contains logic for deciding when grace period has elapsed and a call to `put()` should return without updating the store. The changelogging layer wrapped around this inner layer does not know about grace period, nor do any of the other outer layers. The changelogging layer does call `put()` before calling `log()`, but because `put()` has no return type, it does not convey information about whether an update was actually made or if `put()` simply returned without doing anything. So, the changelogging layer calls `log()` in either case. This is the existing behavior for window stores, and what I had planned to replicate for versioned stores as well. If we don't want this, we could: * update `put()` to return a boolean, indicating whether the update was actually performed, or * track observed stream time and grace period at an outer store layer, in order to not call `log()` at the changelogging layer if it's not needed. I don't particularly like either option. Curious to hear your thoughts. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -615,11 +631,10 @@ private <T extends VersionedStoreSegment> PutStatus maybePutToSegments( } if (foundMinTs < observedStreamTime - historyRetention) { - // the record being inserted does not affect version history. discard and return - if (expiredRecordSensor.isPresent()) { - expiredRecordSensor.get().record(1.0d, context.currentSystemTimeMs()); - LOG.warn("Skipping record for expired put."); - } + // the record being inserted does not affect version history. discard and return. Review Comment: With the changes in this PR, it is only possible to hit this case during restore now. Previously, we passed `Optional.empty()` for the expiredRecordSensor anyway, because we don't want to call the sensor during restore. So I've simplified the code by removing it entirely. The reason it is not possible to hit this case during non-restore is because `doPut()` is not called if the record being put is older than grace period, and history retention is always at least as large as grace period. (See my comment above for why it is still possible to hit this case during restore.) ########## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java: ########## @@ -523,6 +537,63 @@ public void shouldRestoreMultipleBatches() { verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15); } + @Test + public void shouldNotRestoreExpired() { + final List<DataRecord> records = new ArrayList<>(); + records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10)); + records.add(new DataRecord("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD)); // grace period has not elapsed + records.add(new DataRecord("k2", "v2", HISTORY_RETENTION + 9 - GRACE_PERIOD)); // grace period has elapsed, so this record should not be restored + + store.restoreBatch(getChangelogRecords(records)); + + verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10); + verifyGetValueFromStore("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD); + verifyGetNullFromStore("k2"); + } + + @Test + public void shouldRestoreEvenIfRecordWouldBeExpiredByEndOfBatch() { + final List<DataRecord> records = new ArrayList<>(); + records.add(new DataRecord("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD)); // this record will be older than grace period by the end of the batch, but should still be restored + records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10)); + + store.restoreBatch(getChangelogRecords(records)); + + verifyGetValueFromStore("k2", "v2", HISTORY_RETENTION - GRACE_PERIOD); + verifyGetValueFromStore("k", "v", HISTORY_RETENTION + 10); + } + + @Test + public void shouldAllowZeroHistoryRetention() { + // recreate store with zero history retention + store.close(); + store = new RocksDBVersionedStore(STORE_NAME, METRICS_SCOPE, 0L, SEGMENT_INTERVAL); + store.init((StateStoreContext) context, store); + + // put, get, and delete + putToStore("k", "v", BASE_TIMESTAMP); + verifyGetValueFromStore("k", "v", BASE_TIMESTAMP); + verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "v", BASE_TIMESTAMP); + verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 1, "v", BASE_TIMESTAMP); // query in "future" is allowed + + // update existing record at same timestamp + putToStore("k", "updated", BASE_TIMESTAMP); + verifyGetValueFromStore("k", "updated", BASE_TIMESTAMP); + verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP, "updated", BASE_TIMESTAMP); + + // put new record version + putToStore("k", "v2", BASE_TIMESTAMP + 2); + verifyGetValueFromStore("k", "v2", BASE_TIMESTAMP + 2); + verifyTimestampedGetValueFromStore("k", BASE_TIMESTAMP + 2, "v2", BASE_TIMESTAMP + 2); + + // query in past (history retention expired) returns null + verifyTimestampedGetNullFromStore("k", BASE_TIMESTAMP + 1); + + // put in past (grace period expired) does not update the store Review Comment: Sure, I can add that. I was worried that the test case was already getting a bit long :) ########## streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStoreTest.java: ########## @@ -523,6 +537,63 @@ public void shouldRestoreMultipleBatches() { verifyTimestampedGetNullFromStore("k", SEGMENT_INTERVAL - 15); } + @Test + public void shouldNotRestoreExpired() { + final List<DataRecord> records = new ArrayList<>(); + records.add(new DataRecord("k", "v", HISTORY_RETENTION + 10)); + records.add(new DataRecord("k1", "v1", HISTORY_RETENTION + 10 - GRACE_PERIOD)); // grace period has not elapsed + records.add(new DataRecord("k2", "v2", HISTORY_RETENTION + 9 - GRACE_PERIOD)); // grace period has elapsed, so this record should not be restored Review Comment: That's correct. This test case uses the same data as `shouldNotPutExpired()` above. This third record is expired even during normal put operations. ########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -283,7 +293,12 @@ public void init(final StateStoreContext context, final StateStore root) { // VisibleForTesting void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) { - // advance stream time to the max timestamp in the batch + // copy the observed stream time, for use in deciding whether to drop records during restore, + // when records have exceeded the store's grace period. + long streamTimeForRestore = observedStreamTime; Review Comment: Yeah this logic is pretty nuanced. (I tried to clarify in the comments but evidently not successfully.) The `doPut()` method is not responsible for deciding when a put is too old (according to grace period); that check happens before `doPut()` is called. Inside the `doPut()` method, however, `observedStreamTime` is still used to decide when old records have fallen out of history retention. If a record has fallen out of history retention, then we don't need to keep it in the store, and therefore `doPut()` returns. In this restore logic here, `streamTimeForRestore` is used to perform the grace period check. It would be incorrect to advance `streamTimeForRestore` at once for the entire batch, for the reason you gave above. In your example, we do still want to call `doPut()` for the record with `ts=95`. Assuming that is the first record in the restore batch, then `streamTimeForRestore=100` so `ts=95` and we call `doPut()` as we should. Only once we reach the later records in the restore batch will `streamTimeForRestore` be advanced past 100. OTOH, `observedStreamTime` can be advanced to the end of the batch right away. This allows us to optimize situations where, for example, a record near the beginning of the restore batch which we would put into the store would be immediately expired (based on history retention) by the end of the restore batch, and therefore we can skip putting it in inside `doPut()`. Here's an example: * stream time is 50 at the start of the restore batch * segment interval is 25 * stream time will be 100 by the end of the restore batch * restore batch contains a record `(k, v, 50)` and also `(k, v, 60)`. During restore when we see `(k, v, 50)`, we have to put it into the store (it's the latest value for the key so far). Then when we see `(k, v, 60)`, we also have to put it into the store (it's the new latest value) but we do NOT have to move `(k, v, 50)` into a segment store, because the segment that it would be moved into will be expired by the end of the restore process. Here's another example: exact same as above, but the restore batch contains `(k, v, 60)` before `(k, v, 50)`, instead of after. When we see `(k, v, 60)` we have to put it into the store. When we see `(k, v, 50)`, we still call `doPut()` because it's not expired based on grace period, but `doPut()` will see that it is expired based on history retention (using `observedStreamTime=100`, the value it will be by the end of the restore batch) and therefore `doPut()`returns without inserting into the store. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org