[ https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940108#comment-15940108 ]
ASF GitHub Bot commented on FLINK-6034: --------------------------------------- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107870871 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java --- @@ -290,19 +291,19 @@ private static void assignTaskStatesToOperatorInstances( * <p> * <p>This is publicly visible to be used in tests. */ - public static List<KeyGroupsStateHandle> getKeyGroupsStateHandles( - Collection<KeyGroupsStateHandle> allKeyGroupsHandles, - KeyGroupRange subtaskKeyGroupIds) { + public static List<KeyedStateHandle> getKeyedStateHandles( + Collection<? extends KeyedStateHandle> keyedStateHandles, + KeyGroupRange subtaskKeyGroupRange) { - List<KeyGroupsStateHandle> subtaskKeyGroupStates = new ArrayList<>(); + List<KeyedStateHandle> subtaskKeyedStateHandles = new ArrayList<>(); - for (KeyGroupsStateHandle storedKeyGroup : allKeyGroupsHandles) { - KeyGroupsStateHandle intersection = storedKeyGroup.getKeyGroupIntersection(subtaskKeyGroupIds); - if (intersection.getNumberOfKeyGroups() > 0) { - subtaskKeyGroupStates.add(intersection); - } + for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { + KeyedStateHandle intersectedKeyedStateHandle = keyedStateHandle.getIntersection(subtaskKeyGroupRange); + + subtaskKeyedStateHandles.add(intersectedKeyedStateHandle); --- End diff -- I think we should only add a state handle if the key group range from the intersection is non-empty. Even though those empty handles are probably ignored later, I think it is cleaner. > Add KeyedStateHandle for the snapshots in keyed streams > ------------------------------------------------------- > > Key: FLINK-6034 > URL: https://issues.apache.org/jira/browse/FLINK-6034 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing > Reporter: Xiaogang Shi > Assignee: Xiaogang Shi > > Currently, the only type of the snapshots in keyed streams is > {{KeyGroupsStateHandle}} which is full and store the states one group after > another. With the introduction of incremental checkpointing, we need a higher > level abstraction of keyed snapshots to allow flexible snapshot formats. > The implementation of {{KeyedStateHandle}} s may vary a lot in different > backends. The only information needed in {{KeyedStateHandle}} s is their key > group range. When recovering the job with a different degree of parallelism, > {{KeyedStateHandle}} s will be assigned to those subtasks whose key group > ranges overlap with their ranges. -- This message was sent by Atlassian JIRA (v6.3.15#6346)