[
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988894#comment-15988894
]
ASF GitHub Bot commented on FLINK-5892:
---------------------------------------
Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/3770#discussion_r113928558
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
---
@@ -62,252 +69,390 @@ public StateAssignmentOperation(
}
public boolean assignStates() throws Exception {
-
- // this tracks if we find missing node hash ids and already use
secondary mappings
- boolean expandedToLegacyIds = false;
-
+ Map<OperatorID, OperatorState> localStates = new
HashMap<>(taskStates);
Map<JobVertexID, ExecutionJobVertex> localTasks = this.tasks;
- for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry :
taskStates.entrySet()) {
-
- TaskState taskState = taskGroupStateEntry.getValue();
-
- //----------------------------------------find vertex
for state---------------------------------------------
-
- ExecutionJobVertex executionJobVertex =
localTasks.get(taskGroupStateEntry.getKey());
-
- // on the first time we can not find the execution job
vertex for an id, we also consider alternative ids,
- // for example as generated from older flink versions,
to provide backwards compatibility.
- if (executionJobVertex == null && !expandedToLegacyIds)
{
- localTasks =
ExecutionJobVertex.includeLegacyJobVertexIDs(localTasks);
- executionJobVertex =
localTasks.get(taskGroupStateEntry.getKey());
- expandedToLegacyIds = true;
- logger.info("Could not find ExecutionJobVertex.
Including legacy JobVertexIDs in search.");
- }
+ Set<OperatorID> allOperatorIDs = new HashSet<>();
+ for (ExecutionJobVertex executionJobVertex : tasks.values()) {
+
allOperatorIDs.addAll(Lists.newArrayList(executionJobVertex.getOperatorIDs()));
+ }
+ for (Map.Entry<OperatorID, OperatorState> taskGroupStateEntry :
taskStates.entrySet()) {
--- End diff --
Renaming `taskStates` and `taskGroupStateEntry` to something that has
`operator` instead of `task` in it makes this more readable - maybe
`operatorToStateMapping`. Just some leftover from the refactoring i guess.
> Recover job state at the granularity of operator
> ------------------------------------------------
>
> Key: FLINK-5892
> URL: https://issues.apache.org/jira/browse/FLINK-5892
> Project: Flink
> Issue Type: New Feature
> Components: State Backends, Checkpointing
> Reporter: Guowei Ma
> Assignee: Guowei Ma
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the
> granularity of task.
> This leads to the result that the operator of the job may not recover the
> state from a save point even if the save point has the state of operator.
>
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)