Hi Becket, Thank you a lot for your advice, which helped me a lot. > It seems that we don't need the method `SourceReader.getAssignedSplits() `. The assigned splits are available in the SourceCoordinator upon state restoration.
SourceCoordinator doesn't store splits that have already been assigned to readers, and SplitAssignmentTracker stores the splits only for this checkpoint, which will be removed after checkpoint. Maybe you mean SourceOperator? At the beginning, I also thought about using it. However, there are two situations: 1. During restart, if source options remove a topic or table: sometimes connectors like MySQL CDC will remove unused splits after restart in MySqlSourceReader#addSplits [1]. Kafka lacks this, so if the configured topics change, removed topic's splits are still read. I also want to do the same thing in Kafka. 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be removed after restart. In these cases, I have to get the assigned splits after SourceReader#addSplits, rather than get them from SourceOperator directly. > By design, the SplitEnumerator can get the reader information any time from the `SplitEnumeratorContext.registeredReaders()`. It looks good. Thanks again. Best, Hongshun [1] https://github.com/apache/flink-cdc/blob/42f91a864e329c00959828fe0ca4f1e9e8e1de75/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java#L238 On Tue, Aug 5, 2025 at 2:35 PM Becket Qin <[email protected]> wrote: > Hi Hongshun, > > Thanks for the proposal. The current Kafka split assignment algorithm does > seem to have issues. (I cannot recall why it was implemented this way at > that time...). > > Two quick comments: > 1. It seems that we don't need the method `SourceReader. > getAssignedSplits()`. The assigned splits are available in the > SourceCoordinator upon state restoration and can be put into the > ReaderRegistrationEvent. > 2. Instead of adding the method `SplitEnumerator.addReader(int subtaskId, > List<SplitT> assignedSplits)`, add a new field of `InitialSplitAssignment` > to the ReaderInfo. By design, the SplitEnumerator can get the reader > information any time from the `SplitEnumeratorContext.registeredReaders()`. > This also avoids the Enumerator implementation to remember the initially > assigned splits, if it wants to wait until all the readers are registered. > This also allow future addition of reader information. > > Thanks, > > Jiangjie (Becket) Qin > > > > On Mon, Aug 4, 2025 at 8:39 PM Hongshun Wang <[email protected]> > wrote: > >> Anyone familiar with kafka connector can help review this FLIP? I am >> looking forward for your reply. >> >> Best >> Hongshun >> >> On Thu, Jul 24, 2025 at 8:13 PM Leonard Xu <[email protected]> wrote: >> >>> Thanks Hongshun for driving this work. >>> >>> >>> We also suffering the issue in production Kafka restoration usage, >>> current design is a nice tradeoff and has considered the new Source >>> implementation details, +1 from my side. >>> >>> >>> Best, >>> Leonard >>> >>> >>> >>> > 2025 7月 19 18:59,Hongshun Wang <[email protected]> 写道: >>> > >>> > Hi devs, >>> > >>> > I'd like to initiate a discussion about [FLIP-537: Enumerator with >>> Global >>> > Split Assignment Distribution for Balanced Split Assignment] [1], which >>> > addresses critical limitations in our current Kafka connector split >>> > distribution mechanism. >>> > >>> > As documented in [FLINK-31762] [2], several scenarios currently lead to >>> > uneven Kafka split distribution, causing reader delays and performance >>> > bottlenecks. The core issue stems from the enumerator's lack of >>> visibility >>> > into post-assignment split distribution. >>> > >>> > This flip does two things: >>> > 1. ReaderRegistrationEvent Enhancement: SourceOperator should send >>> > ReaderRegistrationEvent with assigned splits metadata after startup to >>> > ensure state consistency. >>> > 2. Implementation in the Kafka connector to resolve imbalanced splits >>> and >>> > state awareness during recovery (the enumerator will always choose the >>> > least assigned subtask,and reason aslo as follows) >>> > >>> > Any additional questions regarding this FLIP? Looking forward to >>> hearing >>> > from you. >>> > >>> > Best >>> > Hongshun >>> > >>> > >>> > [1] >>> > >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>> > [2] https://issues.apache.org/jira/browse/FLINK-31762 >>> >>>
