Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3770#discussion_r113928558
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
    @@ -62,252 +69,390 @@ public StateAssignmentOperation(
        }
     
        public boolean assignStates() throws Exception {
    -
    -           // this tracks if we find missing node hash ids and already use 
secondary mappings
    -           boolean expandedToLegacyIds = false;
    -
    +           Map<OperatorID, OperatorState> localStates = new 
HashMap<>(taskStates);
                Map<JobVertexID, ExecutionJobVertex> localTasks = this.tasks;
     
    -           for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : 
taskStates.entrySet()) {
    -
    -                   TaskState taskState = taskGroupStateEntry.getValue();
    -
    -                   //----------------------------------------find vertex 
for state---------------------------------------------
    -
    -                   ExecutionJobVertex executionJobVertex = 
localTasks.get(taskGroupStateEntry.getKey());
    -
    -                   // on the first time we can not find the execution job 
vertex for an id, we also consider alternative ids,
    -                   // for example as generated from older flink versions, 
to provide backwards compatibility.
    -                   if (executionJobVertex == null && !expandedToLegacyIds) 
{
    -                           localTasks = 
ExecutionJobVertex.includeLegacyJobVertexIDs(localTasks);
    -                           executionJobVertex = 
localTasks.get(taskGroupStateEntry.getKey());
    -                           expandedToLegacyIds = true;
    -                           logger.info("Could not find ExecutionJobVertex. 
Including legacy JobVertexIDs in search.");
    -                   }
    +           Set<OperatorID> allOperatorIDs = new HashSet<>();
    +           for (ExecutionJobVertex executionJobVertex : tasks.values()) {
    +                   
allOperatorIDs.addAll(Lists.newArrayList(executionJobVertex.getOperatorIDs()));
    +           }
    +           for (Map.Entry<OperatorID, OperatorState> taskGroupStateEntry : 
taskStates.entrySet()) {
    --- End diff --
    
    Renaming `taskStates` and `taskGroupStateEntry` to something that has 
`operator` instead of `task` in it makes this more readable -  maybe 
`operatorToStateMapping`. Just some leftover from the refactoring i guess.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to