fredia commented on code in PR #19907:
URL: https://github.com/apache/flink/pull/19907#discussion_r895925820


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java:
##########
@@ -141,6 +158,29 @@ public void storeLocalState(
                     subtaskIndex);
         }
 
+        for (Map.Entry<OperatorID, OperatorSubtaskState> subtaskStateEntry :
+                localState.getSubtaskStateMappings()) {
+            for (KeyedStateHandle keyedStateHandle :
+                    subtaskStateEntry.getValue().getManagedKeyedState()) {
+                if (keyedStateHandle instanceof ChangelogStateBackendHandle) {
+                    ChangelogStateBackendHandle changelogStateBackendHandle =
+                            (ChangelogStateBackendHandle) keyedStateHandle;
+                    long materializationID = 
changelogStateBackendHandle.getMaterializationID();
+                    if (currentMaterializationID == null
+                            || materializationID != 
currentMaterializationID.f0) {
+                        currentMaterializationID = 
Tuple2.of(materializationID, checkpointId);
+                        referredByCheckpointID.clear();

Review Comment:
   I add `lastCheckpointId` to check whether checkpoint is out of order.
   - if yes, don't write to local store, return directly. According to the 
materialization of the out-of-order checkpoint, there are two cases:
        - if cp with a newer materialization, say, (1)cp1 with m1, (2) cp3 with 
m2, (3) cp2 with m1.  when confirming cp3, cp1 and m1 would be discarded, and 
cp2 would not write to local store. therefore, no additional checkpoints are 
kept apart from cp3 and m2.
        -  if cp without a newer materialization, say, (1)cp1 with m1, (2) cp3 
with m1, (3) cp2 with m1. when confirming cp3, cp1 would retain, and cp2 would 
not write to local store. finally, cp1,m1,cp3 are kept.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java:
##########
@@ -98,6 +104,15 @@ public class TaskLocalStateStoreImpl implements 
OwnedTaskLocalStateStore {
     @GuardedBy("lock")
     private final SortedMap<Long, TaskStateSnapshot> 
storedTaskStateByCheckpointID;
 
+    /** The relationship between checkpoints. (cp1,cp3) means cp3 refer to 
cp1's state. */
+    private final Map<Long, Long> referredByCheckpointID;
+
+    /**
+     * (MaterializationID, created by checkpointID). Because local store only 
keeps one checkpoint,
+     * at most one materialization can exist at the same time.
+     */
+    private Tuple2<Long, Long> currentMaterializationID;

Review Comment:
   Nice suggestion, I created `ChangelogTaskLocalStateStore` to encapsulate 
changelog-specific logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to