rkhachatryan commented on a change in pull request #18086: URL: https://github.com/apache/flink/pull/18086#discussion_r768964348
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java ########## @@ -172,4 +174,19 @@ OperatorStateBackend createOperatorStateBackend( default boolean useManagedMemory() { return false; } + + /** + * Tells if a state backend supports taking forced full snapshots. Forced full snapshots are + * snapshots that do not share files with previous snapshots. The feature is necessary to + * properly support {@link RestoreMode#NO_CLAIM} mode. + * + * <p>If a state backend supports forced full snapshots, it should create an independent + * snapshot when it receives {@link CheckpointOptions#isForcedFullSnapshot()} request in {@link + * Snapshotable#snapshot(long, long, CheckpointStreamFactory, CheckpointOptions)}. + * + * @return If the state backend supports taking forced full snapshots. + */ + default boolean supportsForcedFullSnapshot() { Review comment: > As for the checkpointId, what would it mean in that context? My reasoning is that we don't have to re-upload **all** state if first `N` checkpoints fail, only the "old" state. And I was erroneously assuming that there will be an explicit method in StateBackend to take such a snapshot. In reality, `SnapshotStrategy` analyzes `CheckpointOptions`. Passing this `initialCheckpointID` in `CheckpointOptions` seems too fragile and complex. Sorry for confusion. `supportsNoClaimRestoreMode` sounds better :+1: ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ########## @@ -1650,6 +1657,10 @@ public boolean restoreSavepoint( case LEGACY: checkpointProperties = CheckpointProperties.forSavepoint(false); break; + case NO_CLAIM: + this.forceFullSnapshot = true; Review comment: Thanks for the fix, LGTM. Do you think it's possible and worth it to add a test? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ########## @@ -1664,10 +1675,13 @@ public boolean restoreSavepoint( allowNonRestored, checkpointProperties); - // register shared state - even before adding the checkpoint to the store - // because the latter might trigger subsumption so the ref counts must be up-to-date - savepoint.registerSharedStatesAfterRestored( - completedCheckpointStore.getSharedStateRegistry()); + // in claim mode we should not register any shared handles + if (restoreSettings.getRestoreMode() != RestoreMode.NO_CLAIM) { + // register shared state - even before adding the checkpoint to the store + // because the latter might trigger subsumption so the ref counts must be up-to-date + savepoint.registerSharedStatesAfterRestored( Review comment: Just to clarify the updated behavior: 1. any checkpoint added when building `CompletedCheckpointStore` still registers its state with `SharedStateRegistry`; 2. only the latest one, iif not claimed, will have the flag in the properties set - and will NOT be registered with `SharedStateRegistry` Right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org