zhuzhurk commented on code in PR #24087: URL: https://github.com/apache/flink/pull/24087#discussion_r1453369184
########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java: ########## @@ -408,6 +410,27 @@ public Collection<OperatorCoordinatorHolder> getOperatorCoordinators() { return operatorCoordinators; } + public List<SourceCoordinator<?, ?>> getSourceCoordinators() { + checkState(operatorCoordinators != null); Review Comment: The field `operatorCoordinators` is no longer nullable since this commit and we can change it to final. ########## flink-runtime/src/test/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorTest.java: ########## @@ -536,6 +537,25 @@ public void testListeningEventsFromOtherCoordinators() throws Exception { assertThat(store.get(listeningID)).isNotNull().isSameAs(coordinator); } + @Test + public void testInferSourceParallelismAsync() throws Exception { + final String listeningID = "testListeningID"; + + class TestDynamicFilteringEvent implements SourceEvent, DynamicFilteringInfo {} + + CoordinatorStore store = new CoordinatorStoreImpl(); + store.putIfAbsent(listeningID, new SourceEventWrapper(new TestDynamicFilteringEvent())); + final SourceCoordinator<?, ?> coordinator = + new SourceCoordinator<>( + OPERATOR_NAME, + createMockSource(), + context, + store, + WatermarkAlignmentParams.WATERMARK_ALIGNMENT_DISABLED, + listeningID); + assertThat(coordinator.inferSourceParallelismAsync(2, 1).get()).isEqualTo(1); Review Comment: Can we modify the case to to return a value other than 1? `1` is a bit too common that it may hide some problems. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org