fredia commented on code in PR #21822: URL: https://github.com/apache/flink/pull/21822#discussion_r1138645827
########## flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java: ########## @@ -455,12 +467,23 @@ private SnapshotResult<ChangelogStateBackendHandle> buildSnapshotResult( checkpointId, changelogStateBackendStateCopy.materializationID, persistedSizeOfThisCheckpoint); - return SnapshotResult.withLocalState( - jmHandle, + ChangelogStateBackendLocalHandle localHandle = new ChangelogStateBackendLocalHandle( changelogStateBackendStateCopy.getLocalMaterializedSnapshot(), localDeltaCopy, - jmHandle)); + jmHandle); + // register local handle to localRegistry + for (ChangelogStateHandle handle : localDeltaCopy) { + if (handle instanceof ChangelogStateHandleStreamImpl) { + ((ChangelogStateHandleStreamImpl) handle) + .getHandlesAndOffsets() + .forEach( + tuple -> + localChangelogRegistry.register( Review Comment: The registration in the `confirm()` was moved to ChangelogKeyedStatebacked now, during one checkpoint, one local handle is only registered once. This change would affect `LocalChangelogRegistryImpl#prune()`, before this changes, the `prune()` wouldn't cause `fileNotFound`, because `notifyAbort()` is impossible to come after `notifyComplete()` on TM(The lastUsedCheckpointId of previous chk only update on `notifyComplete()`). So I plan to fix the problem of `abort()` in another commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org