Hi devs, I'd like to start a discussion on adding support for fixed-delay asynchronous calls in SplitEnumeratorContext [1].
Because it's a public API, I'm interested in your thoughts: 1. Is this functionality needed in the current design? 2. Would this require a small FLIP or a jira is enough. Currently, org.apache.flink.api.connector.source.SplitEnumeratorContext#callAsync [2] natively only supports fixed rate scheduling for asynchronous calls, which can lead to task accumulation if individual calls take too long to complete. Take kafka connector as example, the process of kafka enumerator partition discovery is: 1. Work Thread getSubscribedTopicPartitions in a fixed rate. 2. After getSubscribedTopicPartitions, coordinator thread will checkPartitionChanges. 3. If new partition is found, work thread will initializePartitionSplits. 4. Finally, coordinator thread will assign split. If once `getSubscribedTopicPartitions` costs two much time, then a lot of getSubscribedTopicPartitions will be added into work thread in a fixed rate. The initializePartitionSplits can only be executed after a long time since this partition will be found. It will case that the assigned and read time of a partition is far lag from created time. If SplitEnumeratorContext supports fixed delay async call, this problem will be solved. I'm looking forwards for your feedback. Best, Hongshun [1] https://issues.apache.org/jira/browse/FLINK-38335 [2] https://github.com/loserwang1024/flink/blob/5628c7097875be4bd56fc7805dbdd727d92bdac7/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java#L127