fredia commented on code in PR #19907: URL: https://github.com/apache/flink/pull/19907#discussion_r895925820
########## flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java: ########## @@ -141,6 +158,29 @@ public void storeLocalState( subtaskIndex); } + for (Map.Entry<OperatorID, OperatorSubtaskState> subtaskStateEntry : + localState.getSubtaskStateMappings()) { + for (KeyedStateHandle keyedStateHandle : + subtaskStateEntry.getValue().getManagedKeyedState()) { + if (keyedStateHandle instanceof ChangelogStateBackendHandle) { + ChangelogStateBackendHandle changelogStateBackendHandle = + (ChangelogStateBackendHandle) keyedStateHandle; + long materializationID = changelogStateBackendHandle.getMaterializationID(); + if (currentMaterializationID == null + || materializationID != currentMaterializationID.f0) { + currentMaterializationID = Tuple2.of(materializationID, checkpointId); + referredByCheckpointID.clear(); Review Comment: I add `lastCheckpointId` to check whether checkpoint is out of order. - if yes, don't write to local store, return directly. According to the materialization of the out-of-order checkpoint, there are two cases: - if cp with a newer materialization, say, (1)cp1 with m1, (2) cp3 with m2, (3) cp2 with m1. when confirming cp3, cp1 and m1 would be discarded, and cp2 would not write to local store. therefore, no additional checkpoints are kept apart from cp3 and m2. - if cp without a newer materialization, say, (1)cp1 with m1, (2) cp3 with m1, (3) cp2 with m1. when confirming cp3, m1 would retain, and cp2 would not write to local store. finally, m1,cp3 are kept. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org