vcrfxia commented on code in PR #13243:
URL: https://github.com/apache/kafka/pull/13243#discussion_r1107567122


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBVersionedStore.java:
##########
@@ -283,7 +293,12 @@ public void init(final StateStoreContext context, final 
StateStore root) {
 
     // VisibleForTesting
     void restoreBatch(final Collection<ConsumerRecord<byte[], byte[]>> 
records) {
-        // advance stream time to the max timestamp in the batch
+        // copy the observed stream time, for use in deciding whether to drop 
records during restore,
+        // when records have exceeded the store's grace period.
+        long streamTimeForRestore = observedStreamTime;

Review Comment:
   > I guess the question is, what is the value of `observedStreamTime` when we 
start the restore? Are you saying it's `-1` and we basically "replay" 
`observedStreamTime` during restore?
   
   Yes, that's exactly right. `observedStreamTime` is tracked locally per 
store. It is initialized to `-1` and only updated on `put()` or during restore. 
(This is the same as the existing behavior for window stores today.)
   
   > Maybe best to update some variable names?
   
   Are you proposing that `doPut()` takes stream time as a parameter, so that 
during normal `put()` operation we pass `observedStreamTime` and during restore 
we pass `endOfBatchStreamTime`, which means we can rename 
`streamTimeForRestore` to be `observedStreamTime` instead? This SGTM, just want 
to check whether that's also what you have in mind, since we removed a number 
of parameters from `doPut()` in a previous PR revision in order to keep the 
parameter list small.
   
   > I guess follow up work (independent for this KIP) might be, to actually 
make use of KS runtime streamTime instead of tracking inside the store, and 
thus won't need `observedStreamTime` any longer, as we could look ahead to the 
"end-of-restore stream-time" (not just "end-of batch").
   
   What's the scope of the "streamTime" which is tracked by the KS runtime? Is 
it per-task? Per-processor? Global? I'm wondering how this would work in 
situations with multiple partitions, or with multiple processors where some 
processors are expected to see new data earlier than other (downstream) 
processors.
   
   I guess we'd also need to implement the change from your other comment about 
not writing records which are expired (based on grace period) into the 
changelog topic first before we can make this change, otherwise we would not 
have a way to determine during restore whether records are expired or not.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to