Hi devs,

I'd like to start a discussion on adding support for fixed-delay
asynchronous calls in SplitEnumeratorContext [1].

Because it's a public API, I'm interested in your thoughts:
1. Is this functionality needed in the current design?
2. Would this require a small FLIP or a jira is enough.

Currently,
org.apache.flink.api.connector.source.SplitEnumeratorContext#callAsync
[2] natively
only supports fixed rate scheduling for asynchronous calls, which can lead
to task accumulation if individual calls take too long to complete.

Take kafka connector as example, the process of kafka enumerator partition
discovery is:
1. Work Thread  getSubscribedTopicPartitions in a fixed rate.
2. After getSubscribedTopicPartitions, coordinator thread will
checkPartitionChanges.
3. If new partition is found, work thread will initializePartitionSplits.
4. Finally, coordinator thread  will assign split.

If once  `getSubscribedTopicPartitions` costs two much time, then a lot of
 getSubscribedTopicPartitions will be added into work thread in a fixed
rate.
The initializePartitionSplits can only be executed after a long time since
this partition will be found. It will case that the assigned and read time
of a partition is far lag from created time.


If SplitEnumeratorContext supports fixed delay async call, this problem
will be solved.

I'm looking forwards for your feedback.

Best,
Hongshun

[1] https://issues.apache.org/jira/browse/FLINK-38335
[2]
https://github.com/loserwang1024/flink/blob/5628c7097875be4bd56fc7805dbdd727d92bdac7/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/ExecutorNotifier.java#L127

Reply via email to