[ https://issues.apache.org/jira/browse/FLINK-3756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15241098#comment-15241098 ]
ASF GitHub Bot commented on FLINK-3756: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1883#discussion_r59712024 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java --- @@ -139,13 +144,43 @@ public boolean acknowledgeTask( ExecutionVertex vertex = notYetAcknowledgedTasks.remove(attemptID); if (vertex != null) { - if (state != null) { - collectedStates.add(new StateForTask( - state, - stateSize, - vertex.getJobvertexId(), + if (state != null || kvState != null) { + + JobVertexID jobVertexID = vertex.getJobvertexId(); + + StateForTaskGroup taskGroupState; + + if (taskGroupStates.containsKey(jobVertexID)) { + taskGroupState = taskGroupStates.get(jobVertexID); + } else { + taskGroupState = new StateForTaskGroup(jobVertexID, vertex.getTotalNumberOfParallelSubtasks()); + taskGroupStates.put(jobVertexID, taskGroupState); + } + + long timestamp = System.currentTimeMillis() - checkpointTimestamp; + + if (state != null) { + taskGroupState.putState( vertex.getParallelSubtaskIndex(), - System.currentTimeMillis() - checkpointTimestamp)); + new StateForTask( + state, + stateSize, + timestamp + ) + ); + } + + if (kvState != null) { + for (Map.Entry<Integer, SerializedValue<StateHandle<?>>> entry : kvState.entrySet()) { + taskGroupState.putKvState( + entry.getKey(), + new KvStateForTasks( + entry.getValue(), + 0L, --- End diff -- Because at the moment the key-value state is still contained in the `state` object. This will be changed with the next PR introducing key groups for key-value state. > Introduce state hierarchy in CheckpointCoordinator > -------------------------------------------------- > > Key: FLINK-3756 > URL: https://issues.apache.org/jira/browse/FLINK-3756 > Project: Flink > Issue Type: Sub-task > Components: JobManager > Affects Versions: 1.1.0 > Reporter: Till Rohrmann > Assignee: Till Rohrmann > Fix For: 1.1.0 > > > Currently, the {{CheckpointCoordinator}} stores the state handles for all > tasks in a list. This lack of structure does not allow to easily group all > tasks belonging to the same {{ExecutionJobVertex}} together. With the > introduction of key groups, it will be necessary to store the key-group state > handles of the different tasks grouped together to make them easily > accessible for redistribution. > Therefore, I propose to introduce the notion of a state for task groups which > groups the state for all tasks belonging to the same {{ExecutionJobVertex}} > together. This task group can then also be used to accumulate the key-group > states from the different tasks. -- This message was sent by Atlassian JIRA (v6.3.4#6332)