[ https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940124#comment-15940124 ]
ASF GitHub Bot commented on FLINK-6034: --------------------------------------- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/3531#discussion_r107873583 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java --- @@ -412,9 +421,15 @@ private void restorePartitionedState(Collection<KeyGroupsStateHandle> state) thr } } - for (Tuple2<Integer, Long> groupOffset : keyGroupsHandle.getGroupRangeOffsets()) { + for (Tuple2<Integer, Long> groupOffset : keyGroupsStateHandle.getGroupRangeOffsets()) { int keyGroupIndex = groupOffset.f0; long offset = groupOffset.f1; + + // Skip those key groups that don't belong to the backend. + if (!keyGroupRange.contains(keyGroupIndex)) { --- End diff -- I think this is no longer required, because we are now "cutting out" the right key-groups from each state handle in the `StateAssignmentOperation`, using the `KeyedStateHandle::getIntersection(...)` method. In fact, I think that receiving a key-group that is not in the backend's range could now be considered as an error. We could rewrite this as a precondition check, or an assert. > Add KeyedStateHandle for the snapshots in keyed streams > ------------------------------------------------------- > > Key: FLINK-6034 > URL: https://issues.apache.org/jira/browse/FLINK-6034 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing > Reporter: Xiaogang Shi > Assignee: Xiaogang Shi > > Currently, the only type of the snapshots in keyed streams is > {{KeyGroupsStateHandle}} which is full and store the states one group after > another. With the introduction of incremental checkpointing, we need a higher > level abstraction of keyed snapshots to allow flexible snapshot formats. > The implementation of {{KeyedStateHandle}} s may vary a lot in different > backends. The only information needed in {{KeyedStateHandle}} s is their key > group range. When recovering the job with a different degree of parallelism, > {{KeyedStateHandle}} s will be assigned to those subtasks whose key group > ranges overlap with their ranges. -- This message was sent by Atlassian JIRA (v6.3.15#6346)