rkhachatryan commented on code in PR #19448:
URL: https://github.com/apache/flink/pull/19448#discussion_r880787079


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java:
##########
@@ -57,13 +58,30 @@ public interface SharedStateRegistry extends AutoCloseable {
      */
     StreamStateHandle registerReference(
             SharedStateRegistryKey registrationKey, StreamStateHandle state, 
long checkpointID);
+
+    /**
+     * Register a reference to the given checkpoint in the registry.
+     *
+     * @param checkpoint which is completed
+     */
+    void registerCompletedCheckpoint(CompletedCheckpoint checkpoint);

Review Comment:
   Wouldn't it be more clear to pass the checkpoint to 
`unregisterUnusedStateAndCheckpoint` (and remove this method)?
   
   That would require changing `Optional<Checkpoint>` to `List` in 
   `StandaloneCompletedCheckpointStore.addCheckpointAndSubsumeOldestOne`, but 
that shouldn't be an issue.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -47,6 +54,9 @@ public class SharedStateRegistryImpl implements 
SharedStateRegistry {
     /** All registered state objects by an artificial key */
     private final Map<SharedStateRegistryKey, SharedStateEntry> 
registeredStates;
 
+    /** All registered checkpoints */
+    private final Map<Long, CompletedCheckpoint> registeredCheckpoints;

Review Comment:
   Rename to `subsumedCheckpoints`? (and update the javadoc)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java:
##########
@@ -156,16 +175,47 @@ public void unregisterUnusedState(long 
lowestCheckpointID) {
                         subsumed.add(entry.stateHandle);
                     }
                     it.remove();
+                } else {
+                    
markCheckpointInUseAction.accept(entry.createdByCheckpointID);
                 }
             }
         }
-
         LOG.trace("Discard {} state asynchronously", subsumed.size());
         for (StreamStateHandle handle : subsumed) {
             scheduleAsyncDelete(handle);
         }
     }
 
+    @Override
+    public void unregisterUnusedState(long lowestCheckpointID) {
+        unregisterState(lowestCheckpointID, (x) -> {});

Review Comment:
   Now that subsumed checkpoints are "owned" by the registry, it has to discard 
them. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to