Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3870#discussion_r116179467
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
 ---
    @@ -18,91 +18,137 @@
     
     package org.apache.flink.runtime.state;
     
    +import org.apache.flink.runtime.concurrent.Executors;
     import org.apache.flink.util.Preconditions;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import java.util.HashMap;
     import java.util.Map;
    +import java.util.Objects;
    +import java.util.concurrent.Executor;
     
     /**
      * A {@code SharedStateRegistry} will be deployed in the 
    - * {@link org.apache.flink.runtime.checkpoint.CheckpointCoordinator} to 
    + * {@link org.apache.flink.runtime.checkpoint.CompletedCheckpointStore} to
      * maintain the reference count of {@link SharedStateHandle}s which are 
shared
    - * among different checkpoints.
    - *
    + * among different incremental checkpoints.
      */
     public class SharedStateRegistry {
     
        private static final Logger LOG = 
LoggerFactory.getLogger(SharedStateRegistry.class);
     
        /** All registered state objects by an artificial key */
    -   private final Map<String, SharedStateRegistry.SharedStateEntry> 
registeredStates;
    +   private final Map<SharedStateRegistryKey, 
SharedStateRegistry.SharedStateEntry> registeredStates;
    +
    +   /** Executor for async state deletion */
    +   private final Executor asyncDisposalExecutor;
     
        public SharedStateRegistry() {
                this.registeredStates = new HashMap<>();
    +           this.asyncDisposalExecutor = Executors.directExecutor(); 
//TODO: FLINK-6534
    --- End diff --
    
    I totally agree that there should not be a new executor, that is why I 
marked it with the TODO. This is just a preparation for the full fix of 
FLINK-6534. My plan for the full fix is to pass the IO executor from the 
`CompletedCheckpointStore` and use it inside the registry. This will happen 
outside of any synchronization. For now, this code is a working placeholder for 
the full fix that I will do as a followup.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to