fredia commented on code in PR #19907:
URL: https://github.com/apache/flink/pull/19907#discussion_r895925820


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java:
##########
@@ -141,6 +158,29 @@ public void storeLocalState(
                     subtaskIndex);
         }
 
+        for (Map.Entry<OperatorID, OperatorSubtaskState> subtaskStateEntry :
+                localState.getSubtaskStateMappings()) {
+            for (KeyedStateHandle keyedStateHandle :
+                    subtaskStateEntry.getValue().getManagedKeyedState()) {
+                if (keyedStateHandle instanceof ChangelogStateBackendHandle) {
+                    ChangelogStateBackendHandle changelogStateBackendHandle =
+                            (ChangelogStateBackendHandle) keyedStateHandle;
+                    long materializationID = 
changelogStateBackendHandle.getMaterializationID();
+                    if (currentMaterializationID == null
+                            || materializationID != 
currentMaterializationID.f0) {
+                        currentMaterializationID = 
Tuple2.of(materializationID, checkpointId);
+                        referredByCheckpointID.clear();

Review Comment:
   I add `lastCheckpointId` to check whether checkpoint is out of order.
   - if yes, don't write to local store, return directly. According to the 
materialization of the out-of-order checkpoint, there are two cases:
        - if cp with a newer materialization, say, (1)cp1 with m1, (2) cp3 with 
m2, (3) cp2 with m1.  when confirming cp3, cp1 and m1 would be discarded, and 
cp2 would not write to local store. therefore, no additional checkpoints are 
kept apart from cp3 and m2.
        -  if cp without a newer materialization, say, (1)cp1 with m1, (2) cp3 
with m1, (3) cp2 with m1. when confirming cp3, m1 would retain, and cp2 would 
not write to local store. finally, m1,cp3 are kept.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to