fredia commented on code in PR #21822:
URL: https://github.com/apache/flink/pull/21822#discussion_r1138645827


##########
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java:
##########
@@ -455,12 +467,23 @@ private SnapshotResult<ChangelogStateBackendHandle> 
buildSnapshotResult(
                             checkpointId,
                             changelogStateBackendStateCopy.materializationID,
                             persistedSizeOfThisCheckpoint);
-            return SnapshotResult.withLocalState(
-                    jmHandle,
+            ChangelogStateBackendLocalHandle localHandle =
                     new ChangelogStateBackendLocalHandle(
                             
changelogStateBackendStateCopy.getLocalMaterializedSnapshot(),
                             localDeltaCopy,
-                            jmHandle));
+                            jmHandle);
+            // register local handle to localRegistry
+            for (ChangelogStateHandle handle : localDeltaCopy) {
+                if (handle instanceof ChangelogStateHandleStreamImpl) {
+                    ((ChangelogStateHandleStreamImpl) handle)
+                            .getHandlesAndOffsets()
+                            .forEach(
+                                    tuple ->
+                                            localChangelogRegistry.register(

Review Comment:
   The registration in the `confirm()` was moved to ChangelogKeyedStatebacked 
now, during one checkpoint, one local handle is only registered once.
   
   This change would affect `LocalChangelogRegistryImpl#prune()`, before this 
changes, the `prune()` wouldn't cause `fileNotFound`, because `notifyAbort()` 
is impossible to come after `notifyComplete()` on TM(The lastUsedCheckpointId 
of previous chk only update on `notifyComplete()`).  So I plan to fix the 
problem of `abort()` in another commit.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to