Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3770#discussion_r113929555
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
    @@ -62,252 +69,390 @@ public StateAssignmentOperation(
        }
     
        public boolean assignStates() throws Exception {
    -
    -           // this tracks if we find missing node hash ids and already use 
secondary mappings
    -           boolean expandedToLegacyIds = false;
    -
    +           Map<OperatorID, OperatorState> localStates = new 
HashMap<>(taskStates);
                Map<JobVertexID, ExecutionJobVertex> localTasks = this.tasks;
     
    -           for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : 
taskStates.entrySet()) {
    -
    -                   TaskState taskState = taskGroupStateEntry.getValue();
    -
    -                   //----------------------------------------find vertex 
for state---------------------------------------------
    -
    -                   ExecutionJobVertex executionJobVertex = 
localTasks.get(taskGroupStateEntry.getKey());
    -
    -                   // on the first time we can not find the execution job 
vertex for an id, we also consider alternative ids,
    -                   // for example as generated from older flink versions, 
to provide backwards compatibility.
    -                   if (executionJobVertex == null && !expandedToLegacyIds) 
{
    -                           localTasks = 
ExecutionJobVertex.includeLegacyJobVertexIDs(localTasks);
    -                           executionJobVertex = 
localTasks.get(taskGroupStateEntry.getKey());
    -                           expandedToLegacyIds = true;
    -                           logger.info("Could not find ExecutionJobVertex. 
Including legacy JobVertexIDs in search.");
    -                   }
    +           Set<OperatorID> allOperatorIDs = new HashSet<>();
    +           for (ExecutionJobVertex executionJobVertex : tasks.values()) {
    +                   
allOperatorIDs.addAll(Lists.newArrayList(executionJobVertex.getOperatorIDs()));
    +           }
    +           for (Map.Entry<OperatorID, OperatorState> taskGroupStateEntry : 
taskStates.entrySet()) {
    --- End diff --
    
    This loop looks like we could factor it out into a private precondition 
method like `checkStateMappingCompleteness` or something like that. Even the 
previous loop and everything working on the hash set could go there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to