vcrfxia commented on code in PR #13243: URL: https://github.com/apache/kafka/pull/13243#discussion_r1107672610
########## streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java: ########## @@ -283,7 +293,12 @@ public void init(final StateStoreContext context, final StateStore root) { // VisibleForTesting void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> records) { - // advance stream time to the max timestamp in the batch + // copy the observed stream time, for use in deciding whether to drop records during restore, + // when records have exceeded the store's grace period. + long streamTimeForRestore = observedStreamTime; Review Comment: > Are you proposing that doPut() takes stream time as a parameter, so that during normal put() operation we pass observedStreamTime and during restore we pass endOfBatchStreamTime, which means we can rename streamTimeForRestore to be observedStreamTime instead? Went ahead and made this update in the latest commit. Can revise if it's not what you had envisioned. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org