AyoubOm commented on code in PR #15361:
URL: https://github.com/apache/kafka/pull/15361#discussion_r1490716253


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java:
##########
@@ -150,5 +150,11 @@ private void initTopology() {
         }
     }
 
+    @Override
+    public void maybeCheckpoint() {
+        if (StateManagerUtil.checkpointNeeded(false, 
stateMgr.changelogOffsets(), offsets)) {

Review Comment:
   Hi Jeff
   For GlobalStateManager, the method _changelogOffsets()_ returns the last 
checkpointed offset, it is updated only on checkpoint, whereas the field 
_offsets_ of GlobalStateUpdateTask is as you said updated for each record, 
hence the order here.
   For ProcessorStateManager the method computes the offset from 
StateStoreMetadata which is insync with the last offset of the changelog topic



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to