zhuzhurk commented on code in PR #24087:
URL: https://github.com/apache/flink/pull/24087#discussion_r1453369184


##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java:
##########
@@ -408,6 +410,27 @@ public Collection<OperatorCoordinatorHolder> 
getOperatorCoordinators() {
         return operatorCoordinators;
     }
 
+    public List<SourceCoordinator<?, ?>> getSourceCoordinators() {
+        checkState(operatorCoordinators != null);

Review Comment:
   The field `operatorCoordinators` is no longer nullable since this commit and 
we can change it to final.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java:
##########
@@ -536,6 +537,25 @@ public void testListeningEventsFromOtherCoordinators() 
throws Exception {
         assertThat(store.get(listeningID)).isNotNull().isSameAs(coordinator);
     }
 
+    @Test
+    public void testInferSourceParallelismAsync() throws Exception {
+        final String listeningID = "testListeningID";
+
+        class TestDynamicFilteringEvent implements SourceEvent, 
DynamicFilteringInfo {}
+
+        CoordinatorStore store = new CoordinatorStoreImpl();
+        store.putIfAbsent(listeningID, new SourceEventWrapper(new 
TestDynamicFilteringEvent()));
+        final SourceCoordinator<?, ?> coordinator =
+                new SourceCoordinator<>(
+                        OPERATOR_NAME,
+                        createMockSource(),
+                        context,
+                        store,
+                        WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED,
+                        listeningID);
+        assertThat(coordinator.inferSourceParallelismAsync(2, 
1).get()).isEqualTo(1);

Review Comment:
   Can we modify the case to to return a value other than 1? `1` is a bit too 
common that it may hide some problems. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to