Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113928558 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java --- @@ -62,252 +69,390 @@ public StateAssignmentOperation( } public boolean assignStates() throws Exception { - - // this tracks if we find missing node hash ids and already use secondary mappings - boolean expandedToLegacyIds = false; - + Map<OperatorID, OperatorState> localStates = new HashMap<>(taskStates); Map<JobVertexID, ExecutionJobVertex> localTasks = this.tasks; - for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : taskStates.entrySet()) { - - TaskState taskState = taskGroupStateEntry.getValue(); - - //----------------------------------------find vertex for state--------------------------------------------- - - ExecutionJobVertex executionJobVertex = localTasks.get(taskGroupStateEntry.getKey()); - - // on the first time we can not find the execution job vertex for an id, we also consider alternative ids, - // for example as generated from older flink versions, to provide backwards compatibility. - if (executionJobVertex == null && !expandedToLegacyIds) { - localTasks = ExecutionJobVertex.includeLegacyJobVertexIDs(localTasks); - executionJobVertex = localTasks.get(taskGroupStateEntry.getKey()); - expandedToLegacyIds = true; - logger.info("Could not find ExecutionJobVertex. Including legacy JobVertexIDs in search."); - } + Set<OperatorID> allOperatorIDs = new HashSet<>(); + for (ExecutionJobVertex executionJobVertex : tasks.values()) { + allOperatorIDs.addAll(Lists.newArrayList(executionJobVertex.getOperatorIDs())); + } + for (Map.Entry<OperatorID, OperatorState> taskGroupStateEntry : taskStates.entrySet()) { --- End diff -- Renaming `taskStates` and `taskGroupStateEntry` to something that has `operator` instead of `task` in it makes this more readable - maybe `operatorToStateMapping`. Just some leftover from the refactoring i guess.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---