fredia commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r853004206


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java:
##########
@@ -211,13 +220,87 @@ public long getStateSize() {
      * @param restoreMode the mode in which this checkpoint was restored from
      */
     public void registerSharedStatesAfterRestored(
-            SharedStateRegistry sharedStateRegistry, RestoreMode restoreMode) {
+            SharedStateRegistry sharedStateRegistry,
+            RestoreMode restoreMode,
+            boolean changelogEnabled) {
         // in claim mode we should not register any shared handles
         if (!props.isUnclaimed()) {
+            if (changelogEnabled) {
+                for (OperatorState operatorState : operatorStates.values()) {
+                    for (Map.Entry<Integer, OperatorSubtaskState> entry :
+                            operatorState.getSubtaskStates().entrySet()) {
+                        List<KeyedStateHandle> changelogStateBackendHandles =
+                                
entry.getValue().getManagedKeyedState().stream()
+                                        .map(x -> 
getChangelogStateBackendHandle(x))
+                                        .collect(Collectors.toList());
+                        StateObjectCollection<KeyedStateHandle> stateHandles =
+                                new 
StateObjectCollection<>(changelogStateBackendHandles);
+                        operatorState.putState(
+                                entry.getKey(),
+                                entry.getValue()
+                                        .toBuilder()
+                                        .setManagedKeyedState(stateHandles)
+                                        .build());

Review Comment:
   Thanks for your suggestion! 
   Keeping `CompletedCheckpoint` immutable is a better approach, so I moved the 
code of rebuilding checkpoint to `Checkpoints.loadAndValidateCheckpoint`, and 
put the cast logic in 
`ChangelogStateBackendHandle.ChangelogStateBackendHandleImpl`. 
   
   > I think registering all KeyedStateHandles with the SharedStateRegistry on 
recovery in CLAIM mode would also solve the problem, wouldn't it?
   
   For this suggestion, I think it may not work as well,  because the 
`discardState()` of some KeyedStateHandles are **not empty**,  the state would 
be discarded on checkpoint subsuming.
   and I also left a comment under 
[FLINK-25872](https://issues.apache.org/jira/browse/FLINK-25872), maybe we can 
discuss in the ticket.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to