[ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940124#comment-15940124
 ] 

ASF GitHub Bot commented on FLINK-6034:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3531#discussion_r107873583
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
    @@ -412,9 +421,15 @@ private void 
restorePartitionedState(Collection<KeyGroupsStateHandle> state) thr
                                        }
                                }
     
    -                           for (Tuple2<Integer, Long> groupOffset : 
keyGroupsHandle.getGroupRangeOffsets()) {
    +                           for (Tuple2<Integer, Long> groupOffset : 
keyGroupsStateHandle.getGroupRangeOffsets()) {
                                        int keyGroupIndex = groupOffset.f0;
                                        long offset = groupOffset.f1;
    +
    +                                   // Skip those key groups that don't 
belong to the backend.
    +                                   if 
(!keyGroupRange.contains(keyGroupIndex)) {
    --- End diff --
    
    I think this is no longer required, because we are now "cutting out" the 
right key-groups from each state handle in the `StateAssignmentOperation`, 
using the `KeyedStateHandle::getIntersection(...)` method. In fact, I think 
that receiving a key-group that is not in the backend's range could now be 
considered as an error. We could rewrite this as a precondition check, or an 
assert.


> Add KeyedStateHandle for the snapshots in keyed streams
> -------------------------------------------------------
>
>                 Key: FLINK-6034
>                 URL: https://issues.apache.org/jira/browse/FLINK-6034
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Xiaogang Shi
>            Assignee: Xiaogang Shi
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to