Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113929555 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java --- @@ -62,252 +69,390 @@ public StateAssignmentOperation( } public boolean assignStates() throws Exception { - - // this tracks if we find missing node hash ids and already use secondary mappings - boolean expandedToLegacyIds = false; - + Map<OperatorID, OperatorState> localStates = new HashMap<>(taskStates); Map<JobVertexID, ExecutionJobVertex> localTasks = this.tasks; - for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : taskStates.entrySet()) { - - TaskState taskState = taskGroupStateEntry.getValue(); - - //----------------------------------------find vertex for state--------------------------------------------- - - ExecutionJobVertex executionJobVertex = localTasks.get(taskGroupStateEntry.getKey()); - - // on the first time we can not find the execution job vertex for an id, we also consider alternative ids, - // for example as generated from older flink versions, to provide backwards compatibility. - if (executionJobVertex == null && !expandedToLegacyIds) { - localTasks = ExecutionJobVertex.includeLegacyJobVertexIDs(localTasks); - executionJobVertex = localTasks.get(taskGroupStateEntry.getKey()); - expandedToLegacyIds = true; - logger.info("Could not find ExecutionJobVertex. Including legacy JobVertexIDs in search."); - } + Set<OperatorID> allOperatorIDs = new HashSet<>(); + for (ExecutionJobVertex executionJobVertex : tasks.values()) { + allOperatorIDs.addAll(Lists.newArrayList(executionJobVertex.getOperatorIDs())); + } + for (Map.Entry<OperatorID, OperatorState> taskGroupStateEntry : taskStates.entrySet()) { --- End diff -- This loop looks like we could factor it out into a private precondition method like `checkStateMappingCompleteness` or something like that. Even the previous loop and everything working on the hash set could go there.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---