Hi Becket,
I did consider this approach at the beginning (and it was also mentioned in this FLIP), since it would allow more flexibility in reassigning all splits. However, there are a few potential issues. 1. High Transmission Cost If we pass the full split objects (rather than just split IDs), the data size could be significant, leading to high overhead during transmission — especially when many splits are involved. 2. Risk of Split Loss Risk of split loss exists unless we have a mechanism to make sure only can checkpoint after all the splits are reassigned. There are scenarios where splits could be lost due to inconsistent state handling during recovery: Scenario 1: 1. Upon restart, Reader A reports assigned splits (1 and 2), and Reader B reports (3 and 4). 2. The enumerator receives these reports but only reassigns splits 1 and 2 — not 3 and 4. 3. A checkpoint or savepoint is then triggered. Only splits 1 and 2 are recorded in the reader states; splits 3 and 4 are not persisted. 4. If the job is later restarted from this checkpoint, splits 3 and 4 will be permanently lost. Scenario 2: 1. Reader A reports splits (1 and 2), and Reader B reports (3 and 4) upon restart. 2. Before the enumerator receives all reports and performs reassignment, a checkpoint is triggered. 3. Since no splits have been reassigned yet, both readers have empty states. 4. When restarting from this checkpoint, all four splits are lost. Let me know if you have thoughts on how we might mitigate these risks! Best Hongshun On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <[email protected]> wrote: > Hi Hongshun, > > The steps sound reasonable to me in general. In terms of the updated FLIP > wiki, it would be good to see if we can keep the protocol simple. One > alternative way to achieve this behavior is following: > > 1. Upon SourceOperator startup, the SourceOperator sends > ReaderRegistrationEvent with the currently assigned splits to the > enumerator. It does not add these splits to the SourceReader. > 2. The enumerator will always use the > SourceEnumeratorContext.assignSplits() to assign the splits. (not via the > response of the SourceRegistrationEvent, this allows async split assignment > in case the enumerator wants to wait until all the readers are registered) > 3. The SourceOperator will only call SourceReader.addSplits() when it > receives the AddSplitEvent from the enumerator. > > This protocol has a few benefits: > 1. it basically allows arbitrary split reassignment upon restart > 2. simplicity: there is only one way to assign splits. > > So we only need one interface change: > - add the initially assigned splits to ReaderInfo so the Enumerator can > access it. > and one behavior change: > - The SourceOperator should stop assigning splits to the from state > restoration, but only do that when it receives AddSplitsEvent from the > enumerator. > > The enumerator story is also simple: > 1. Receive some kind of notification (new partition, new reader, etc) > 2. look at the reader information (in the enumerator context or > self-maintained state) > 3. assign splits via the enumerator context. > > Thanks, > > Jiangjie (Becket) Qin > > On Thu, Aug 7, 2025 at 1:31 AM Hongshun Wang <[email protected]> > wrote: > >> Hi Becket, >> Thanks for your advice — I’ve quickly learned a lot about the reader’s >> design principle. It’s really interesting! >> >> > One principle we want to follow is that the enumerator should be the >> brain doing the splits assignment, while the source readers read from the >> assigned splits. So we want to avoid the case where the SourceReader >> ignores the split assignment. >> >> It appears that MySQL CDC currently bypasses this principle by >> proactively removing unused splits directly in the SourceReader. This may >> be due to the lack of built-in framework support for such cleanup, forcing >> connectors to handle it manually. However, this responsibility ideally >> belongs in the framework. >> >> With this FLIP, we propose a redesigned mechanism that centralizes split >> cleanup logic in the SplitEnumerator, allowing connectors like MySQL CDC to >> eventually adopt it( @leneord, CC). >> >> >> To achieve this, we must carefully manage state consistency during >> startup and recovery. The proposed approach is as follows: >> >> 1. Reader Registration with Deferred Assignment >> When a reader starts (SourceOperator#open), it sends a >> ReaderRegistrationEvent to the SplitEnumerator, including its >> previously assigned splits (restored from state). However, these splits >> are not yet assigned to the reader. The SourceOperator is placed in a >> PENDING state. >> 2. Prevent State Pollution During Registration >> While in the PENDING state, SourceOperator#snapshotState will not >> update the operator state. This prevents empty or outdated reader state >> (e.g., with removed splits) from polluting the checkpoint. >> 3. Enumerator Performs Split Cleanup and Acknowledges >> Upon receiving the ReaderRegistrationEvent, the SplitEnumerator removes >> any splits that are no longer valid (e.g., due to removed topics or >> tables) >> and returns the list of remaining valid split IDs to the reader via a >> ReaderRegistrationACKEvent. >> For backward compatibility, the default behavior is to return all >> split IDs (i.e., no filtering). >> 4. Finalize Registration and Resume Normal Operation >> When the SourceOperator receives the ReaderRegistrationACKEvent, it >> assigns the confirmed splits to the reader and transitions its state to >> REGISTERED. From this point onward, SourceOperator#snapshotState can >> safely update the operator state. >> >> >> Best, >> Hongshun >> >> On Thu, Aug 7, 2025 at 1:57 AM Becket Qin <[email protected]> wrote: >> >>> SourceCoordinator doesn't store splits that have already been assigned >>>> to readers, and SplitAssignmentTracker stores the splits only for this >>>> checkpoint, which will be removed after checkpoint. Maybe you mean >>>> SourceOperator? >>> >>> Yes, I meant SourceOperator. >>> >>> At the beginning, I also thought about using it. However, there are two >>>> situations: >>>> 1. During restart, if source options remove a topic or table: sometimes >>>> connectors like MySQL CDC will remove unused splits after restart in >>>> MySqlSourceReader#addSplits [1]. Kafka lacks this, so if the configured >>>> topics change, removed topic's splits are still read. I also want to do the >>>> same thing in Kafka. >>>> 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be >>>> removed after restart. >>>> In these cases, I have to get the assigned splits after >>>> SourceReader#addSplits, rather than get them from SourceOperator >>>> directly. >>> >>> >>> One principle we want to follow is that the enumerator should be the >>> brain doing the splits assignment, while the source readers read from the >>> assigned splits. So we want to avoid the case where the SourceReader >>> ignores the split assignment. Given this principle, >>> For case 1, if there is a subscription change, it might be better to >>> hold back calling SourceReader.addSplits() until an assignment is confirmed >>> by the Enumerator. In fact, this might be a good default behavior >>> regardless of whether there is a subscription change. >>> For case 2: if a bounded split is finished, the >>> SourceReader.snapshotState() will not contain that split. So upon >>> restoration, those splits should not appear, right? >>> >>> Thanks, >>> >>> Jiangjie (Becket) Qin >>> >>> On Wed, Aug 6, 2025 at 5:19 AM Hongshun Wang <[email protected]> >>> wrote: >>> >>>> Hi Becket, >>>> >>>> Thank you a lot for your advice, which helped me a lot. >>>> > It seems that we don't need the method `SourceReader. >>>> getAssignedSplits()`. The assigned splits are available in the >>>> SourceCoordinator upon state restoration. >>>> >>>> SourceCoordinator doesn't store splits that have already been assigned >>>> to readers, and SplitAssignmentTracker stores the splits only for this >>>> checkpoint, which will be removed after checkpoint. Maybe you mean >>>> SourceOperator? >>>> >>>> At the beginning, I also thought about using it. However, there are two >>>> situations: >>>> 1. During restart, if source options remove a topic or table: sometimes >>>> connectors like MySQL CDC will remove unused splits after restart in >>>> MySqlSourceReader#addSplits [1]. Kafka lacks this, so if the configured >>>> topics change, removed topic's splits are still read. I also want to do the >>>> same thing in Kafka. >>>> 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be >>>> removed after restart. >>>> In these cases, I have to get the assigned splits after >>>> SourceReader#addSplits, rather than get them from SourceOperator >>>> directly. >>>> >>>> > By design, the SplitEnumerator can get the reader information any >>>> time from the `SplitEnumeratorContext.registeredReaders()`. >>>> It looks good. >>>> >>>> Thanks again. >>>> >>>> Best, >>>> Hongshun >>>> >>>> >>>> [1] >>>> https://github.com/apache/flink-cdc/blob/42f91a864e329c00959828fe0ca4f1e9e8e1de75/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java#L238 >>>> >>>> >>>> On Tue, Aug 5, 2025 at 2:35 PM Becket Qin <[email protected]> wrote: >>>> >>>>> Hi Hongshun, >>>>> >>>>> Thanks for the proposal. The current Kafka split assignment algorithm >>>>> does seem to have issues. (I cannot recall why it was implemented this way >>>>> at that time...). >>>>> >>>>> Two quick comments: >>>>> 1. It seems that we don't need the method `SourceReader. >>>>> getAssignedSplits()`. The assigned splits are available in the >>>>> SourceCoordinator upon state restoration and can be put into the >>>>> ReaderRegistrationEvent. >>>>> 2. Instead of adding the method `SplitEnumerator.addReader(int >>>>> subtaskId, List<SplitT> assignedSplits)`, add a new field of >>>>> `InitialSplitAssignment` to the ReaderInfo. By design, the SplitEnumerator >>>>> can get the reader information any time from the >>>>> `SplitEnumeratorContext.registeredReaders()`. This also avoids the >>>>> Enumerator implementation to remember the initially assigned splits, if it >>>>> wants to wait until all the readers are registered. This also allow future >>>>> addition of reader information. >>>>> >>>>> Thanks, >>>>> >>>>> Jiangjie (Becket) Qin >>>>> >>>>> >>>>> >>>>> On Mon, Aug 4, 2025 at 8:39 PM Hongshun Wang <[email protected]> >>>>> wrote: >>>>> >>>>>> Anyone familiar with kafka connector can help review this FLIP? I am >>>>>> looking forward for your reply. >>>>>> >>>>>> Best >>>>>> Hongshun >>>>>> >>>>>> On Thu, Jul 24, 2025 at 8:13 PM Leonard Xu <[email protected]> wrote: >>>>>> >>>>>>> Thanks Hongshun for driving this work. >>>>>>> >>>>>>> >>>>>>> We also suffering the issue in production Kafka restoration usage, >>>>>>> current design is a nice tradeoff and has considered the new Source >>>>>>> implementation details, +1 from my side. >>>>>>> >>>>>>> >>>>>>> Best, >>>>>>> Leonard >>>>>>> >>>>>>> >>>>>>> >>>>>>> > 2025 7月 19 18:59,Hongshun Wang <[email protected]> 写道: >>>>>>> > >>>>>>> > Hi devs, >>>>>>> > >>>>>>> > I'd like to initiate a discussion about [FLIP-537: Enumerator with >>>>>>> Global >>>>>>> > Split Assignment Distribution for Balanced Split Assignment] [1], >>>>>>> which >>>>>>> > addresses critical limitations in our current Kafka connector split >>>>>>> > distribution mechanism. >>>>>>> > >>>>>>> > As documented in [FLINK-31762] [2], several scenarios currently >>>>>>> lead to >>>>>>> > uneven Kafka split distribution, causing reader delays and >>>>>>> performance >>>>>>> > bottlenecks. The core issue stems from the enumerator's lack of >>>>>>> visibility >>>>>>> > into post-assignment split distribution. >>>>>>> > >>>>>>> > This flip does two things: >>>>>>> > 1. ReaderRegistrationEvent Enhancement: SourceOperator should send >>>>>>> > ReaderRegistrationEvent with assigned splits metadata after >>>>>>> startup to >>>>>>> > ensure state consistency. >>>>>>> > 2. Implementation in the Kafka connector to resolve imbalanced >>>>>>> splits and >>>>>>> > state awareness during recovery (the enumerator will always choose >>>>>>> the >>>>>>> > least assigned subtask,and reason aslo as follows) >>>>>>> > >>>>>>> > Any additional questions regarding this FLIP? Looking forward to >>>>>>> hearing >>>>>>> > from you. >>>>>>> > >>>>>>> > Best >>>>>>> > Hongshun >>>>>>> > >>>>>>> > >>>>>>> > [1] >>>>>>> > >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>>>>> > [2] https://issues.apache.org/jira/browse/FLINK-31762 >>>>>>> >>>>>>>
