pnowojski commented on a change in pull request #18539:
URL: https://github.com/apache/flink/pull/18539#discussion_r795622785



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
##########
@@ -203,10 +204,37 @@ public long getStateSize() {
      *
      * @param sharedStateRegistry The registry where shared states are 
registered
      */
-    public void registerSharedStatesAfterRestored(SharedStateRegistry 
sharedStateRegistry) {
-        // in claim mode we should not register any shared handles
+    public void registerSharedStatesAfterRestored(
+            SharedStateRegistry sharedStateRegistry, boolean claim) {

Review comment:
       nit: missing java doc

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
##########
@@ -203,10 +204,37 @@ public long getStateSize() {
      *
      * @param sharedStateRegistry The registry where shared states are 
registered
      */
-    public void registerSharedStatesAfterRestored(SharedStateRegistry 
sharedStateRegistry) {
-        // in claim mode we should not register any shared handles
+    public void registerSharedStatesAfterRestored(
+            SharedStateRegistry sharedStateRegistry, boolean claim) {
+        // in no_claim mode we should not register any shared handles
         if (!props.isUnclaimed()) {
-            sharedStateRegistry.registerAll(operatorStates.values(), 
checkpointID);
+            if (claim) {

Review comment:
       ```
   if (!props.isUnclaimed) { // == if (props.isClaimed())
     if (claim) { // == is doubly claimed?
     } 
     else {  // == it is claimed, but not really?
     }
   }
   ```
   🤔 Can you explain what's happening here?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
##########
@@ -71,5 +71,22 @@ StreamStateHandle registerReference(
      */
     void registerAll(Iterable<? extends CompositeStateHandle> stateHandles, 
long checkpointID);
 
+    /**
+     * Register given shared states in the registry along with a custom 
location for shared files.
+     * The custom location will be cleaned, once all of corresponding 
registered shared handles are
+     * unregistered. Should be used

Review comment:
       > Should be used
   
   ?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
##########
@@ -224,19 +289,27 @@ public void close() {
         }
     }
 
+    @FunctionalInterface
+    private interface PostDispose {
+        void execute() throws Exception;
+    }
+
     /** Encapsulates the operation the delete state handles asynchronously. */
     private static final class AsyncDisposalRunnable implements Runnable {
 
-        private final StateObject toDispose;
+        private final StreamStateHandle toDispose;
+        private final PostDispose postDispose;
 
-        public AsyncDisposalRunnable(StateObject toDispose) {
+        public AsyncDisposalRunnable(StreamStateHandle toDispose, PostDispose 
postDispose) {

Review comment:
       Can not we just pass `stateEntry.registryKey` here as a parameter 
instead of creating a functional interface with lambda functions?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
##########
@@ -71,5 +71,22 @@ StreamStateHandle registerReference(
      */
     void registerAll(Iterable<? extends CompositeStateHandle> stateHandles, 
long checkpointID);
 
+    /**
+     * Register given shared states in the registry along with a custom 
location for shared files.
+     * The custom location will be cleaned, once all of corresponding 
registered shared handles are
+     * unregistered. Should be used

Review comment:
       nit:
   > Should be used
   
   ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to