rkhachatryan commented on code in PR #19448: URL: https://github.com/apache/flink/pull/19448#discussion_r880787079
########## flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java: ########## @@ -57,13 +58,30 @@ public interface SharedStateRegistry extends AutoCloseable { */ StreamStateHandle registerReference( SharedStateRegistryKey registrationKey, StreamStateHandle state, long checkpointID); + + /** + * Register a reference to the given checkpoint in the registry. + * + * @param checkpoint which is completed + */ + void registerCompletedCheckpoint(CompletedCheckpoint checkpoint); Review Comment: Wouldn't it be more clear to pass the checkpoint to `unregisterUnusedStateAndCheckpoint` (and remove this method)? That would require changing `Optional<Checkpoint>` to `List` in `StandaloneCompletedCheckpointStore.addCheckpointAndSubsumeOldestOne`, but that shouldn't be an issue. ########## flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java: ########## @@ -47,6 +54,9 @@ public class SharedStateRegistryImpl implements SharedStateRegistry { /** All registered state objects by an artificial key */ private final Map<SharedStateRegistryKey, SharedStateEntry> registeredStates; + /** All registered checkpoints */ + private final Map<Long, CompletedCheckpoint> registeredCheckpoints; Review Comment: Rename to `subsumedCheckpoints`? (and update the javadoc) ########## flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java: ########## @@ -156,16 +175,47 @@ public void unregisterUnusedState(long lowestCheckpointID) { subsumed.add(entry.stateHandle); } it.remove(); + } else { + markCheckpointInUseAction.accept(entry.createdByCheckpointID); } } } - LOG.trace("Discard {} state asynchronously", subsumed.size()); for (StreamStateHandle handle : subsumed) { scheduleAsyncDelete(handle); } } + @Override + public void unregisterUnusedState(long lowestCheckpointID) { + unregisterState(lowestCheckpointID, (x) -> {}); Review Comment: Now that subsumed checkpoints are "owned" by the registry, it has to discard them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org