[ 
https://issues.apache.org/jira/browse/FLINK-5892?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988895#comment-15988895
 ] 

ASF GitHub Bot commented on FLINK-5892:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3770#discussion_r113928105
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
    @@ -62,252 +69,390 @@ public StateAssignmentOperation(
        }
     
        public boolean assignStates() throws Exception {
    -
    -           // this tracks if we find missing node hash ids and already use 
secondary mappings
    -           boolean expandedToLegacyIds = false;
    -
    +           Map<OperatorID, OperatorState> localStates = new 
HashMap<>(taskStates);
                Map<JobVertexID, ExecutionJobVertex> localTasks = this.tasks;
     
    -           for (Map.Entry<JobVertexID, TaskState> taskGroupStateEntry : 
taskStates.entrySet()) {
    -
    -                   TaskState taskState = taskGroupStateEntry.getValue();
    -
    -                   //----------------------------------------find vertex 
for state---------------------------------------------
    -
    -                   ExecutionJobVertex executionJobVertex = 
localTasks.get(taskGroupStateEntry.getKey());
    -
    -                   // on the first time we can not find the execution job 
vertex for an id, we also consider alternative ids,
    -                   // for example as generated from older flink versions, 
to provide backwards compatibility.
    -                   if (executionJobVertex == null && !expandedToLegacyIds) 
{
    -                           localTasks = 
ExecutionJobVertex.includeLegacyJobVertexIDs(localTasks);
    -                           executionJobVertex = 
localTasks.get(taskGroupStateEntry.getKey());
    -                           expandedToLegacyIds = true;
    -                           logger.info("Could not find ExecutionJobVertex. 
Including legacy JobVertexIDs in search.");
    -                   }
    +           Set<OperatorID> allOperatorIDs = new HashSet<>();
    +           for (ExecutionJobVertex executionJobVertex : tasks.values()) {
    +                   
allOperatorIDs.addAll(Lists.newArrayList(executionJobVertex.getOperatorIDs()));
    --- End diff --
    
    I we change to immutable list instead of array, this code also saves one 
converting to list


> Recover job state at the granularity of operator
> ------------------------------------------------
>
>                 Key: FLINK-5892
>                 URL: https://issues.apache.org/jira/browse/FLINK-5892
>             Project: Flink
>          Issue Type: New Feature
>          Components: State Backends, Checkpointing
>            Reporter: Guowei Ma
>            Assignee: Guowei Ma
>
> JobGraph has no `Operator` info so `ExecutionGraph` can only recovery at the 
> granularity of task.
> This leads to the result that the operator of the job may not recover the 
> state from a save point even if the save point has the state of operator. 
>  
> https://docs.google.com/document/d/19suTRF0nh7pRgeMnIEIdJ2Fq-CcNVt5_hR3cAoICf7Q/edit#.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to