[ https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988895#comment-15988895 ]
ASF GitHub Bot commented on FLINK-5892: --------------------------------------- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3770#discussion_r113928105 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java --- @@ -62,252 +69,390 @@ public StateAssignmentOperation( } public boolean assignStates() throws Exception { - - // this tracks if we find missing node hash ids and already use secondary mappings - boolean expandedToLegacyIds = false; - + Map<OperatorID, OperatorState> localStates = new HashMap<>(taskStates); Map<JobVertexID, ExecutionJobVertex> localTasks = this.tasks; - for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : taskStates.entrySet()) { - - TaskState taskState = taskGroupStateEntry.getValue(); - - //----------------------------------------find vertex for state--------------------------------------------- - - ExecutionJobVertex executionJobVertex = localTasks.get(taskGroupStateEntry.getKey()); - - // on the first time we can not find the execution job vertex for an id, we also consider alternative ids, - // for example as generated from older flink versions, to provide backwards compatibility. - if (executionJobVertex == null && !expandedToLegacyIds) { - localTasks = ExecutionJobVertex.includeLegacyJobVertexIDs(localTasks); - executionJobVertex = localTasks.get(taskGroupStateEntry.getKey()); - expandedToLegacyIds = true; - logger.info("Could not find ExecutionJobVertex. Including legacy JobVertexIDs in search."); - } + Set<OperatorID> allOperatorIDs = new HashSet<>(); + for (ExecutionJobVertex executionJobVertex : tasks.values()) { + allOperatorIDs.addAll(Lists.newArrayList(executionJobVertex.getOperatorIDs())); --- End diff -- I we change to immutable list instead of array, this code also saves one converting to list > Recover job state at the granularity of operator > ------------------------------------------------ > > Key: FLINK-5892 > URL: https://issues.apache.org/jira/browse/FLINK-5892 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing > Reporter: Guowei Ma > Assignee: Guowei Ma > > JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the > granularity of task. > This leads to the result that the operator of the job may not recover the > state from a save point even if the save point has the state of operator. > > https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#. -- This message was sent by Atlassian JIRA (v6.3.15#6346)