[ https://issues.apache.org/jira/browse/FLINK-22147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jiangjie Qin resolved FLINK-22147. ---------------------------------- Fix Version/s: 1.14.0 Resolution: Implemented Merged to master. 1418a1ddd025adb3b502b8d7a89d0f338aa40c29 > Refactor Partition Discovery Logic in KafkaSourceEnumerator > ----------------------------------------------------------- > > Key: FLINK-22147 > URL: https://issues.apache.org/jira/browse/FLINK-22147 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka > Affects Versions: 1.13.0 > Reporter: Qingsheng Ren > Priority: Major > Labels: pull-request-available > Fix For: 1.14.0 > > > Currently the logic of partition discovery is: the worker thread checks if > there's new partitions and initialize new splits if so, then coordinator > thread marks these splits as pending and try to make assignments. > Under current design, the worker thread needs to keep an internal data > structure tracking already discovered partitions, which is duplicated with > pending splits + assigned partitions tracked by coordinator thread. Usually > this kind of double-bookkeeping is fragile. > Another issue is that the worker thread always fetches descriptions of ALL > topics at partition discovery, which will comes to a problem working with a > giant Kafka clusters with millions of topics/partitions. > In order to fix issues above, a refactor is needed for the partition > discovery logic in Kafka enumerator. Basically the logic can be changed to: > # The worker thread fetches descriptions of subscribed topics/partitions, > then hands over to coordinator thread > # The coordinator thread filters out already discovered partitions (pending > + assigned partitions), then invokes worker thread with {{callAsync}} to > fetch offsets for new partitions > # The worker thread fetches offsets and creates splits for new partitions, > then hands over new splits to coordinator thread > # The coordinator thread marks these splits as pending and try to make > assignment. > Discussion of this issue can be found in > [https://github.com/apache/flink/pull/15461] . -- This message was sent by Atlassian Jira (v8.3.4#803005)