Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3531#discussion_r107410978
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 ---
    @@ -306,6 +307,29 @@ private static void 
assignTaskStatesToOperatorInstances(
        }
     
        /**
    +    * Determine the subset of {@link KeyGroupsStateHandle 
KeyGroupsStateHandles} with correct
    +    * key group index for the given subtask {@link KeyGroupRange}.
    +    * <p>
    +    * <p>This is publicly visible to be used in tests.
    +    */
    +   public static List<KeyedStateHandle> getKeyedStateHandles(
    +                   Collection<? extends KeyedStateHandle> 
keyedStateHandles,
    +                   KeyGroupRange subtaskKeyGroupRange) {
    +
    +           List<KeyedStateHandle> subtaskKeyedStateHandles = new 
ArrayList<>();
    +
    +           for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
    +                   KeyGroupRange intersection = 
keyedStateHandle.getKeyGroupRange().getIntersection(subtaskKeyGroupRange);
    --- End diff --
    
    I wonder if we could somehow introduce a 
`KeyedStateHandle::intersect(KeyGroupRange)` that again returns a 
`KeyedStateHandle` with a `KeyGroupRage` that is the intersection of the 
original range and the argument. Basically a higher level version of what the 
KeyGroupsStateHandle can do, and the concrete implementations (like 
`KeyGroupsStateHandle`) know how the virtually split themselves up into a 
sub-range. This also would transfer less data in the RPC (less offsets) and 
saves the post-filtering in the backend. 
    
    Otherwise, we could have a boolean method for just checking intersection, 
because there is no need to create `KeyGroupRange` objects anymore, because we 
do not actually use them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to