[ 
https://issues.apache.org/jira/browse/FLINK-3756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15241098#comment-15241098
 ] 

ASF GitHub Bot commented on FLINK-3756:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1883#discussion_r59712024
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 ---
    @@ -139,13 +144,43 @@ public boolean acknowledgeTask(
                        
                        ExecutionVertex vertex = 
notYetAcknowledgedTasks.remove(attemptID);
                        if (vertex != null) {
    -                           if (state != null) {
    -                                   collectedStates.add(new StateForTask(
    -                                                   state,
    -                                                   stateSize,
    -                                                   vertex.getJobvertexId(),
    +                           if (state != null || kvState != null) {
    +
    +                                   JobVertexID jobVertexID = 
vertex.getJobvertexId();
    +
    +                                   StateForTaskGroup taskGroupState;
    +
    +                                   if 
(taskGroupStates.containsKey(jobVertexID)) {
    +                                           taskGroupState = 
taskGroupStates.get(jobVertexID);
    +                                   } else {
    +                                           taskGroupState = new 
StateForTaskGroup(jobVertexID, vertex.getTotalNumberOfParallelSubtasks());
    +                                           
taskGroupStates.put(jobVertexID, taskGroupState);
    +                                   }
    +
    +                                   long timestamp = 
System.currentTimeMillis() - checkpointTimestamp;
    +
    +                                   if (state != null) {
    +                                           taskGroupState.putState(
                                                        
vertex.getParallelSubtaskIndex(),
    -                                                   
System.currentTimeMillis() - checkpointTimestamp));
    +                                                   new StateForTask(
    +                                                           state,
    +                                                           stateSize,
    +                                                           timestamp
    +                                                   )
    +                                           );
    +                                   }
    +
    +                                   if (kvState != null) {
    +                                           for (Map.Entry<Integer, 
SerializedValue<StateHandle<?>>> entry : kvState.entrySet()) {
    +                                                   
taskGroupState.putKvState(
    +                                                           entry.getKey(),
    +                                                           new 
KvStateForTasks(
    +                                                                   
entry.getValue(),
    +                                                                   0L,
    --- End diff --
    
    Because at the moment the key-value state is still contained in the `state` 
object. This will be changed with the next PR introducing key groups for 
key-value state.


> Introduce state hierarchy in CheckpointCoordinator
> --------------------------------------------------
>
>                 Key: FLINK-3756
>                 URL: https://issues.apache.org/jira/browse/FLINK-3756
>             Project: Flink
>          Issue Type: Sub-task
>          Components: JobManager
>    Affects Versions: 1.1.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>             Fix For: 1.1.0
>
>
> Currently, the {{CheckpointCoordinator}} stores the state handles for all 
> tasks in a list. This lack of structure does not allow to easily group all 
> tasks belonging to the same {{ExecutionJobVertex}} together. With the 
> introduction of key groups, it will be necessary to store the key-group state 
> handles of the different tasks grouped together to make them easily 
> accessible for redistribution. 
> Therefore, I propose to introduce the notion of a state for task groups which 
> groups the state for all tasks belonging to the same {{ExecutionJobVertex}} 
> together. This task group can then also be used to accumulate the key-group 
> states from the different tasks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to