rkhachatryan commented on a change in pull request #18086:
URL: https://github.com/apache/flink/pull/18086#discussion_r768964348



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
##########
@@ -172,4 +174,19 @@ OperatorStateBackend createOperatorStateBackend(
     default boolean useManagedMemory() {
         return false;
     }
+
+    /**
+     * Tells if a state backend supports taking forced full snapshots. Forced 
full snapshots are
+     * snapshots that do not share files with previous snapshots. The feature 
is necessary to
+     * properly support {@link RestoreMode#NO_CLAIM} mode.
+     *
+     * <p>If a state backend supports forced full snapshots, it should create 
an independent
+     * snapshot when it receives {@link 
CheckpointOptions#isForcedFullSnapshot()} request in {@link
+     * Snapshotable#snapshot(long, long, CheckpointStreamFactory, 
CheckpointOptions)}.
+     *
+     * @return If the state backend supports taking forced full snapshots.
+     */
+    default boolean supportsForcedFullSnapshot() {

Review comment:
       > As for the checkpointId, what would it mean in that context?
   
   My reasoning is that we don't have to re-upload **all** state if first `N` 
checkpoints fail, only the "old" state. 
   And I was erroneously assuming that there will be an explicit method in 
StateBackend to take such a snapshot. In reality, `SnapshotStrategy` analyzes 
`CheckpointOptions`.
   Passing this `initialCheckpointID` in `CheckpointOptions` seems too fragile 
and complex.
   Sorry for confusion.
   
   `supportsNoClaimRestoreMode` sounds better :+1: 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1650,6 +1657,10 @@ public boolean restoreSavepoint(
             case LEGACY:
                 checkpointProperties = 
CheckpointProperties.forSavepoint(false);
                 break;
+            case NO_CLAIM:
+                this.forceFullSnapshot = true;

Review comment:
       Thanks for the fix, LGTM. 
   Do you think it's possible and worth it to add a test?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1664,10 +1675,13 @@ public boolean restoreSavepoint(
                         allowNonRestored,
                         checkpointProperties);
 
-        // register shared state - even before adding the checkpoint to the 
store
-        // because the latter might trigger subsumption so the ref counts must 
be up-to-date
-        savepoint.registerSharedStatesAfterRestored(
-                completedCheckpointStore.getSharedStateRegistry());
+        // in claim mode we should not register any shared handles
+        if (restoreSettings.getRestoreMode() != RestoreMode.NO_CLAIM) {
+            // register shared state - even before adding the checkpoint to 
the store
+            // because the latter might trigger subsumption so the ref counts 
must be up-to-date
+            savepoint.registerSharedStatesAfterRestored(

Review comment:
       Just to clarify the updated behavior: 
   1. any checkpoint added when building `CompletedCheckpointStore` still 
registers its state with `SharedStateRegistry`; 
   2. only the latest one, iif not claimed, will have the flag in the 
properties set - and will NOT be registered with `SharedStateRegistry`
   
   Right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to