pnowojski commented on a change in pull request #18539: URL: https://github.com/apache/flink/pull/18539#discussion_r795622785
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java ########## @@ -203,10 +204,37 @@ public long getStateSize() { * * @param sharedStateRegistry The registry where shared states are registered */ - public void registerSharedStatesAfterRestored(SharedStateRegistry sharedStateRegistry) { - // in claim mode we should not register any shared handles + public void registerSharedStatesAfterRestored( + SharedStateRegistry sharedStateRegistry, boolean claim) { Review comment: nit: missing java doc ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java ########## @@ -203,10 +204,37 @@ public long getStateSize() { * * @param sharedStateRegistry The registry where shared states are registered */ - public void registerSharedStatesAfterRestored(SharedStateRegistry sharedStateRegistry) { - // in claim mode we should not register any shared handles + public void registerSharedStatesAfterRestored( + SharedStateRegistry sharedStateRegistry, boolean claim) { + // in no_claim mode we should not register any shared handles if (!props.isUnclaimed()) { - sharedStateRegistry.registerAll(operatorStates.values(), checkpointID); + if (claim) { Review comment: ``` if (!props.isUnclaimed) { // == if (props.isClaimed()) if (claim) { // == is doubly claimed? } else { // == it is claimed, but not really? } } ``` 🤔 Can you explain what's happening here? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java ########## @@ -71,5 +71,22 @@ StreamStateHandle registerReference( */ void registerAll(Iterable<? extends CompositeStateHandle> stateHandles, long checkpointID); + /** + * Register given shared states in the registry along with a custom location for shared files. + * The custom location will be cleaned, once all of corresponding registered shared handles are + * unregistered. Should be used Review comment: > Should be used ? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java ########## @@ -224,19 +289,27 @@ public void close() { } } + @FunctionalInterface + private interface PostDispose { + void execute() throws Exception; + } + /** Encapsulates the operation the delete state handles asynchronously. */ private static final class AsyncDisposalRunnable implements Runnable { - private final StateObject toDispose; + private final StreamStateHandle toDispose; + private final PostDispose postDispose; - public AsyncDisposalRunnable(StateObject toDispose) { + public AsyncDisposalRunnable(StreamStateHandle toDispose, PostDispose postDispose) { Review comment: Can not we just pass `stateEntry.registryKey` here as a parameter instead of creating a functional interface with lambda functions? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java ########## @@ -71,5 +71,22 @@ StreamStateHandle registerReference( */ void registerAll(Iterable<? extends CompositeStateHandle> stateHandles, long checkpointID); + /** + * Register given shared states in the registry along with a custom location for shared files. + * The custom location will be cleaned, once all of corresponding registered shared handles are + * unregistered. Should be used Review comment: nit: > Should be used ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org