[ 
https://issues.apache.org/jira/browse/FLINK-6034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15940108#comment-15940108
 ] 

ASF GitHub Bot commented on FLINK-6034:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3531#discussion_r107870871
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
    @@ -290,19 +291,19 @@ private static void 
assignTaskStatesToOperatorInstances(
         * <p>
         * <p>This is publicly visible to be used in tests.
         */
    -   public static List<KeyGroupsStateHandle> getKeyGroupsStateHandles(
    -                   Collection<KeyGroupsStateHandle> allKeyGroupsHandles,
    -                   KeyGroupRange subtaskKeyGroupIds) {
    +   public static List<KeyedStateHandle> getKeyedStateHandles(
    +                   Collection<? extends KeyedStateHandle> 
keyedStateHandles,
    +                   KeyGroupRange subtaskKeyGroupRange) {
     
    -           List<KeyGroupsStateHandle> subtaskKeyGroupStates = new 
ArrayList<>();
    +           List<KeyedStateHandle> subtaskKeyedStateHandles = new 
ArrayList<>();
     
    -           for (KeyGroupsStateHandle storedKeyGroup : allKeyGroupsHandles) 
{
    -                   KeyGroupsStateHandle intersection = 
storedKeyGroup.getKeyGroupIntersection(subtaskKeyGroupIds);
    -                   if (intersection.getNumberOfKeyGroups() > 0) {
    -                           subtaskKeyGroupStates.add(intersection);
    -                   }
    +           for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
    +                   KeyedStateHandle intersectedKeyedStateHandle = 
keyedStateHandle.getIntersection(subtaskKeyGroupRange);
    +
    +                   
subtaskKeyedStateHandles.add(intersectedKeyedStateHandle);
    --- End diff --
    
    I think we should only add a state handle if the key group range from the 
intersection is non-empty. Even though those empty handles are probably ignored 
later, I think it is cleaner.


> Add KeyedStateHandle for the snapshots in keyed streams
> -------------------------------------------------------
>
>                 Key: FLINK-6034
>                 URL: https://issues.apache.org/jira/browse/FLINK-6034
>             Project: Flink
>          Issue Type: Sub-task
>          Components: State Backends, Checkpointing
>            Reporter: Xiaogang Shi
>            Assignee: Xiaogang Shi
>
> Currently, the only type of the snapshots in keyed streams is 
> {{KeyGroupsStateHandle}} which is full and store the states one group after 
> another. With the introduction of incremental checkpointing, we need a higher 
> level abstraction of keyed snapshots to allow flexible snapshot formats. 
> The implementation of {{KeyedStateHandle}} s may vary a lot in different 
> backends. The only information needed in {{KeyedStateHandle}} s is their key 
> group range. When recovering the job with a different degree of parallelism, 
> {{KeyedStateHandle}} s will be assigned to those subtasks whose key group 
> ranges overlap with their ranges.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to