curcur commented on a change in pull request #16714:
URL: https://github.com/apache/flink/pull/16714#discussion_r685748071



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
##########
@@ -123,9 +128,28 @@
         return mergeMapList;
     }
 
-    /** Collect union states from given parallelSubtaskStates. */
     private Map<String, List<Tuple2<StreamStateHandle, 
OperatorStateHandle.StateMetaInfo>>>
             collectUnionStates(List<List<OperatorStateHandle>> 
parallelSubtaskStates) {
+        return collectStates(parallelSubtaskStates, 
OperatorStateHandle.Mode.UNION);
+    }
+
+    private Map<String, List<Tuple2<StreamStateHandle, 
OperatorStateHandle.StateMetaInfo>>>
+            collectPartlyFinishedBroadcastStates(
+                    List<List<OperatorStateHandle>> parallelSubtaskStates) {
+        return collectStates(parallelSubtaskStates, 
OperatorStateHandle.Mode.BROADCAST).entrySet()
+                .stream()
+                .filter(
+                        e ->
+                                e.getValue().size() > 0
+                                        && e.getValue().size() < 
parallelSubtaskStates.size())
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+    }
+
+    /** Collect union states from given parallelSubtaskStates. */

Review comment:
       change the comment to reflect what the function does right now?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
##########
@@ -123,9 +128,28 @@
         return mergeMapList;
     }
 
-    /** Collect union states from given parallelSubtaskStates. */
     private Map<String, List<Tuple2<StreamStateHandle, 
OperatorStateHandle.StateMetaInfo>>>
             collectUnionStates(List<List<OperatorStateHandle>> 
parallelSubtaskStates) {
+        return collectStates(parallelSubtaskStates, 
OperatorStateHandle.Mode.UNION);
+    }
+
+    private Map<String, List<Tuple2<StreamStateHandle, 
OperatorStateHandle.StateMetaInfo>>>
+            collectPartlyFinishedBroadcastStates(
+                    List<List<OperatorStateHandle>> parallelSubtaskStates) {
+        return collectStates(parallelSubtaskStates, 
OperatorStateHandle.Mode.BROADCAST).entrySet()
+                .stream()
+                .filter(
+                        e ->
+                                e.getValue().size() > 0
+                                        && e.getValue().size() < 
parallelSubtaskStates.size())

Review comment:
       `e.getValue().size() > 0` means `not empty`?
   what does `e.getValue().size() < parallelSubtaskStates.size()` mean?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
##########
@@ -123,9 +128,28 @@
         return mergeMapList;
     }
 
-    /** Collect union states from given parallelSubtaskStates. */
     private Map<String, List<Tuple2<StreamStateHandle, 
OperatorStateHandle.StateMetaInfo>>>
             collectUnionStates(List<List<OperatorStateHandle>> 
parallelSubtaskStates) {
+        return collectStates(parallelSubtaskStates, 
OperatorStateHandle.Mode.UNION);
+    }
+
+    private Map<String, List<Tuple2<StreamStateHandle, 
OperatorStateHandle.StateMetaInfo>>>
+            collectPartlyFinishedBroadcastStates(
+                    List<List<OperatorStateHandle>> parallelSubtaskStates) {
+        return collectStates(parallelSubtaskStates, 
OperatorStateHandle.Mode.BROADCAST).entrySet()
+                .stream()
+                .filter(
+                        e ->
+                                e.getValue().size() > 0
+                                        && e.getValue().size() < 
parallelSubtaskStates.size())

Review comment:
       My guess is that the empty state should be `e.getValue().size() == 0` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to