Thanks Hongshun and Becket for the deep discussion. 

I only have one comment for one API design:

> Deprecate the old addSplitsBack  method, use a addSplitsBack with param 
> isReportedByReader instead. Because, The enumerator can apply different 
> reassignment policies based on the context.

Could we introduce a new method like addSplitsBackOnRecovery  with default 
implementation. In this way, we can provide better backward compatibility and 
also makes it easier for developers to understand.

Best,
Leonard



> 2025 9月 3 20:26,Hongshun Wang <[email protected]> 写道:
> 
> Hi Becket,
> 
> I think that's a great idea!  I have added the 
> SupportSplitReassignmentOnRecovery interface in this FLIP. If a Source 
> implements this interface indicates that the source operator needs to report 
> splits to the enumerator and receive reassignment.[1]
> 
> Best,
> Hongshun
> 
> [1] 
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
> 
> On Thu, Aug 21, 2025 at 12:09 PM Becket Qin <[email protected] 
> <mailto:[email protected]>> wrote:
>> Hi Hongshun,
>> 
>> I think the convention for such optional features in Source is via mix-in 
>> interfaces. So instead of adding a method to the SourceReader, maybe we 
>> should introduce an interface SupportSplitReassingmentOnRecovery with this 
>> method. If a Source implementation implements that interface, then the 
>> SourceOperator will check the desired behavior and act accordingly.
>> 
>> Thanks,
>> 
>> Jiangjie (Becket) Qin
>> 
>> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang <[email protected] 
>> <mailto:[email protected]>> wrote:
>>> Hi de vs,
>>> 
>>> Would anyone like to discuss this FLIP? I'd appreciate your feedback and 
>>> suggestions.
>>> 
>>> Best,
>>> Hongshun
>>> 
>>> 
>>>> 2025年8月13日 14:23,Hongshun Wang <[email protected] 
>>>> <mailto:[email protected]>> 写道:
>>>> 
>>>> Hi Becket,
>>>> Thank you for your detailed feedback. The new contract makes good sense to 
>>>> me and effectively addresses the issues I encountered at the beginning of 
>>>> the design.
>>>> That said, I recommend not reporting splits by default, primarily for 
>>>> compatibility and practical reasons:
>>>> >  For these reasons, we do not expect the Split objects to be huge, and 
>>>> > we are not trying to design for huge Split objects either as they will 
>>>> > have problems even today.
>>>> Not all existing connector match this rule
>>>> For example, in mysql cdc connector, a binlog split may contain hundreds 
>>>> (or even more) snapshot split completion records. This state is large and 
>>>> is currently transmitted incrementally through multiple 
>>>> BinlogSplitMetaEvent messages. Since the binlog reader operates with 
>>>> single parallelism, reporting the full split state on recovery could be 
>>>> inefficient or even infeasible.
>>>> For such sources, it would be better to provide a mechanism to skip split 
>>>> reporting during restart until they redesign and reduce the split size.
>>>> Not all enumerators maintain unassigned splits in state.
>>>> Some SplitEnumerator(such as kafka connector) implementations do not track 
>>>> or persistently manage unassigned splits. Requiring them to handle 
>>>> re-registration would add unnecessary complexity. Even though we maybe 
>>>> implements in kafka connector, currently, kafka connector is decouple with 
>>>> flink version, we also need to make sure the elder version is compatible.
>>>> To address these concerns, I propose introducing a new method: boolean 
>>>> SourceReader#shouldReassignSplitsOnRecovery() with a default 
>>>> implementation returning false. This allows source readers to opt in to 
>>>> split reassignment only when necessary. Since the new contract already 
>>>> places the responsibility for split assignment on the enumerator, not 
>>>> reporting splits by default is a safe and clean default behavior.
>>>> 
>>>> 
>>>> I’ve updated the implementation and the FIP accordingly[1]. It quite a big 
>>>> change. In particular, for the Kafka connector, we can now use a pluggable 
>>>> SplitPartitioner to support different split assignment strategies (e.g., 
>>>> default, round-robin).
>>>> 
>>>> Could you please review it when you have a chance?
>>>> 
>>>> Best,
>>>> Hongshun
>>>> 
>>>> [1] 
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>> 
>>>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <[email protected] 
>>>> <mailto:[email protected]>> wrote:
>>>>> Hi Hongshun,
>>>>> 
>>>>> I am not too concerned about the transmission cost. Because the full 
>>>>> split transmission has to happen in the initial assignment phase already. 
>>>>> And in the future, we probably want to also introduce some kind of 
>>>>> workload balance across source readers, e.g. based on the per-split 
>>>>> throughput or the per-source-reader workload in heterogeneous clusters. 
>>>>> For these reasons, we do not expect the Split objects to be huge, and we 
>>>>> are not trying to design for huge Split objects either as they will have 
>>>>> problems even today.
>>>>> 
>>>>> Good point on the potential split loss, please see the reply below:
>>>>> 
>>>>>> Scenario 2:
>>>>>> 
>>>>>> 1. Reader A reports splits (1 and 2), and Reader B reports (3 and 4) 
>>>>>> upon restart.
>>>>>> 2. Before the enumerator receives all reports and performs reassignment, 
>>>>>> a checkpoint is triggered.
>>>>>> 3. Since no splits have been reassigned yet, both readers have empty 
>>>>>> states.
>>>>>> 4. When restarting from this checkpoint, all four splits are lost.
>>>>> The reader registration happens in the SourceOperator.open(), which means 
>>>>> the task is still in the initializing state, therefore the checkpoint 
>>>>> should not be triggered until the enumerator receives all the split 
>>>>> reports.
>>>>> 
>>>>> There is a nuance here. Today, the RPC call from the TM to the JM is 
>>>>> async. So it is possible that the SourceOpertor.open() has returned, but 
>>>>> the enumerator has not received the split reports. However, because the 
>>>>> task status update RPC call goes to the same channel as the split reports 
>>>>> call, so the task status RPC call will happen after the split reports 
>>>>> call on the JM side. Therefore, on the JM side, the SourceCoordinator 
>>>>> will always first receive the split reports, then receive the checkpoint 
>>>>> request.
>>>>> This "happen before" relationship is kind of important to guarantee the 
>>>>> consistent state between enumerator and readers.
>>>>> 
>>>>>> Scenario 1:
>>>>>> 
>>>>>> 1. Upon restart, Reader A reports assigned splits (1 and 2), and Reader 
>>>>>> B reports (3 and 4).
>>>>>> 2. The enumerator receives these reports but only reassigns splits 1 and 
>>>>>> 2 — not 3 and 4.
>>>>>> 3. A checkpoint or savepoint is then triggered. Only splits 1 and 2 are 
>>>>>> recorded in the reader states; splits 3 and 4 are not persisted.
>>>>>> 4. If the job is later restarted from this checkpoint, splits 3 and 4 
>>>>>> will be permanently lost.
>>>>> This scenario is possible. One solution is to let the enumerator 
>>>>> implementation handle this. That means if the enumerator relies on the 
>>>>> initial split reports from the source readers, it should maintain these 
>>>>> reports by itself. In the above example, the enumerator will need to 
>>>>> remember that 3 and 4 are not assigned and put it into its own state.
>>>>> The current contract is that anything assigned to the SourceReaders are 
>>>>> completely owned by the SourceReaders. Enumerators can remember the 
>>>>> assignments but cannot change them, even when the source reader recovers 
>>>>> / restarts.
>>>>> With this FLIP, the contract becomes that the source readers will return 
>>>>> the ownership of the splits to the enumerator. So the enumerator is 
>>>>> responsible for maintaining these splits, until they are assigned to a 
>>>>> source reader again.
>>>>> 
>>>>> There are other cases where there may be conflict information between 
>>>>> reader and enumerator. For example, consider the following sequence:
>>>>> 1. reader A reports splits (1 and 2) up on restart.
>>>>> 2. enumerator receives the report and assigns both 1 and 2 to reader B.
>>>>> 3. reader A failed before checkpointing. And this is a partial failure, 
>>>>> so only reader A restarts.
>>>>> 4. When reader A recovers, it will again report splits (1 and 2) to the 
>>>>> enumerator. 
>>>>> 5. The enumerator should ignore this report because it has assigned 
>>>>> splits (1 and 2) to reader B.
>>>>> 
>>>>> So with the new contract, the enumerator should be the source of truth 
>>>>> for split ownership.
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> Jiangjie (Becket) Qin
>>>>> 
>>>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang <[email protected] 
>>>>> <mailto:[email protected]>> wrote:
>>>>>> Hi Becket,
>>>>>> 
>>>>>> I did consider this approach at the beginning (and it was also mentioned 
>>>>>> in this FLIP), since it would allow more flexibility in reassigning all 
>>>>>> splits. However, there are a few potential issues.
>>>>>> 1. High Transmission Cost
>>>>>> If we pass the full split objects (rather than just split IDs), the data 
>>>>>> size could be significant, leading to high overhead during transmission 
>>>>>> — especially when many splits are involved.
>>>>>> 2. Risk of Split Loss
>>>>>> Risk of split loss exists unless we have a mechanism to make sure only 
>>>>>> can checkpoint after all the splits are reassigned.
>>>>>> There are scenarios where splits could be lost due to inconsistent state 
>>>>>> handling during recovery:
>>>>>> 
>>>>>> Scenario 1:
>>>>>> 
>>>>>> Upon restart, Reader A reports assigned splits (1 and 2), and Reader B 
>>>>>> reports (3 and 4).
>>>>>> The enumerator receives these reports but only reassigns splits 1 and 2 
>>>>>> — not 3 and 4.
>>>>>> A checkpoint or savepoint is then triggered. Only splits 1 and 2 are 
>>>>>> recorded in the reader states; splits 3 and 4 are not persisted.
>>>>>> If the job is later restarted from this checkpoint, splits 3 and 4 will 
>>>>>> be permanently lost.
>>>>>> 
>>>>>> Scenario 2: 
>>>>>> Reader A reports splits (1 and 2), and Reader B reports (3 and 4) upon 
>>>>>> restart.
>>>>>> Before the enumerator receives all reports and performs reassignment, a 
>>>>>> checkpoint is triggered.
>>>>>> Since no splits have been reassigned yet, both readers have empty states.
>>>>>> When restarting from this checkpoint, all four splits are lost.
>>>>>> 
>>>>>> Let me know if you have thoughts on how we might mitigate these risks!
>>>>>> 
>>>>>> Best
>>>>>> Hongshun
>>>>>> 
>>>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <[email protected] 
>>>>>> <mailto:[email protected]>> wrote:
>>>>>>> Hi Hongshun,
>>>>>>> 
>>>>>>> The steps sound reasonable to me in general. In terms of the updated 
>>>>>>> FLIP wiki, it would be good to see if we can keep the protocol simple. 
>>>>>>> One alternative way to achieve this behavior is following:
>>>>>>> 
>>>>>>> 1. Upon SourceOperator startup, the SourceOperator sends 
>>>>>>> ReaderRegistrationEvent with the currently assigned splits to the 
>>>>>>> enumerator. It does not add these splits to the SourceReader.
>>>>>>> 2. The enumerator will always use the 
>>>>>>> SourceEnumeratorContext.assignSplits() to assign the splits. (not via 
>>>>>>> the response of the SourceRegistrationEvent, this allows async split 
>>>>>>> assignment in case the enumerator wants to wait until all the readers 
>>>>>>> are registered)
>>>>>>> 3. The SourceOperator will only call SourceReader.addSplits() when it 
>>>>>>> receives the AddSplitEvent from the enumerator.
>>>>>>> 
>>>>>>> This protocol has a few benefits: 
>>>>>>> 1. it basically allows arbitrary split reassignment upon restart
>>>>>>> 2. simplicity: there is only one way to assign splits.
>>>>>>> 
>>>>>>> So we only need one interface change:
>>>>>>> - add the initially assigned splits to ReaderInfo so the Enumerator can 
>>>>>>> access it.
>>>>>>> and one behavior change:
>>>>>>> - The SourceOperator should stop assigning splits to the from state 
>>>>>>> restoration, but only do that when it receives AddSplitsEvent from the 
>>>>>>> enumerator.
>>>>>>> 
>>>>>>> The enumerator story is also simple:
>>>>>>> 1. Receive some kind of notification (new partition, new reader, etc)
>>>>>>> 2. look at the reader information (in the enumerator context or 
>>>>>>> self-maintained state)
>>>>>>> 3. assign splits via the enumerator context.
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> 
>>>>>>> Jiangjie (Becket) Qin
>>>>>>> 
>>>>>>> On Thu, Aug 7, 2025 at 1:31 AM Hongshun Wang <[email protected] 
>>>>>>> <mailto:[email protected]>> wrote:
>>>>>>>> Hi Becket,
>>>>>>>> Thanks for your advice — I’ve quickly learned a lot about the reader’s 
>>>>>>>> design principle. It’s really interesting!
>>>>>>>> 
>>>>>>>> > One principle we want to follow is that the enumerator should be the 
>>>>>>>> > brain doing the splits assignment, while the source readers read 
>>>>>>>> > from the assigned splits. So we want to avoid the case where the 
>>>>>>>> > SourceReader ignores the split assignment. 
>>>>>>>> 
>>>>>>>> It appears that MySQL CDC currently bypasses this principle by 
>>>>>>>> proactively removing unused splits directly in the SourceReader. This 
>>>>>>>> may be due to the lack of built-in framework support for such cleanup, 
>>>>>>>> forcing connectors to handle it manually. However, this responsibility 
>>>>>>>> ideally belongs in the framework.
>>>>>>>> 
>>>>>>>> With this FLIP, we propose a redesigned mechanism that centralizes 
>>>>>>>> split cleanup logic in the SplitEnumerator, allowing connectors like 
>>>>>>>> MySQL CDC to eventually adopt it( @leneord, CC).
>>>>>>>> 
>>>>>>>> To achieve this, we must carefully manage state consistency during 
>>>>>>>> startup and recovery. The proposed approach is as follows:
>>>>>>>> Reader Registration with Deferred Assignment
>>>>>>>> When a reader starts (SourceOperator#open), it sends a 
>>>>>>>> ReaderRegistrationEvent to the SplitEnumerator, including its 
>>>>>>>> previously assigned splits (restored from state). However, these 
>>>>>>>> splits are not yet assigned to the reader. The SourceOperator is 
>>>>>>>> placed in a PENDING state.
>>>>>>>> Prevent State Pollution During Registration
>>>>>>>> While in the PENDING state, SourceOperator#snapshotState will not 
>>>>>>>> update the operator state. This prevents empty or outdated reader 
>>>>>>>> state (e.g., with removed splits) from polluting the checkpoint.
>>>>>>>> Enumerator Performs Split Cleanup and Acknowledges
>>>>>>>> Upon receiving the ReaderRegistrationEvent, the SplitEnumerator 
>>>>>>>> removes any splits that are no longer valid (e.g., due to removed 
>>>>>>>> topics or tables) and returns the list of remaining valid split IDs to 
>>>>>>>> the reader via a ReaderRegistrationACKEvent.
>>>>>>>> For backward compatibility, the default behavior is to return all 
>>>>>>>> split IDs (i.e., no filtering).
>>>>>>>> Finalize Registration and Resume Normal Operation
>>>>>>>> When the SourceOperator receives the ReaderRegistrationACKEvent, it 
>>>>>>>> assigns the confirmed splits to the reader and transitions its state 
>>>>>>>> to REGISTERED. From this point onward, SourceOperator#snapshotState 
>>>>>>>> can safely update the operator state.
>>>>>>>> 
>>>>>>>> Best,
>>>>>>>> Hongshun
>>>>>>>> 
>>>>>>>> On Thu, Aug 7, 2025 at 1:57 AM Becket Qin <[email protected] 
>>>>>>>> <mailto:[email protected]>> wrote:
>>>>>>>>>> SourceCoordinator doesn't store splits that have already been 
>>>>>>>>>> assigned to readers, and SplitAssignmentTracker stores the splits 
>>>>>>>>>> only for this checkpoint, which will be removed after checkpoint. 
>>>>>>>>>> Maybe you mean SourceOperator?
>>>>>>>>> Yes, I meant SourceOperator.
>>>>>>>>> 
>>>>>>>>>> At the beginning, I also thought about using it. However, there are 
>>>>>>>>>> two situations:
>>>>>>>>>> 1. During restart, if source options remove a topic or table: 
>>>>>>>>>> sometimes connectors like MySQL CDC will remove unused splits after 
>>>>>>>>>> restart in MySqlSourceReader#addSplits [1]. Kafka lacks this, so if 
>>>>>>>>>> the configured topics change, removed topic's splits are still read. 
>>>>>>>>>> I also want to do the same thing in Kafka.
>>>>>>>>>> 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be 
>>>>>>>>>> removed after restart.
>>>>>>>>>> In these cases, I have to get the assigned splits after 
>>>>>>>>>> SourceReader#addSplits, rather than get them from SourceOperator 
>>>>>>>>>> directly.
>>>>>>>>> 
>>>>>>>>> One principle we want to follow is that the enumerator should be the 
>>>>>>>>> brain doing the splits assignment, while the source readers read from 
>>>>>>>>> the assigned splits. So we want to avoid the case where the 
>>>>>>>>> SourceReader ignores the split assignment. Given this principle, 
>>>>>>>>> For case 1, if there is a subscription change, it might be better to 
>>>>>>>>> hold back calling SourceReader.addSplits() until an assignment is 
>>>>>>>>> confirmed by the Enumerator. In fact, this might be a good default 
>>>>>>>>> behavior regardless of whether there is a subscription change.
>>>>>>>>> For case 2: if a bounded split is finished, the 
>>>>>>>>> SourceReader.snapshotState() will not contain that split. So upon 
>>>>>>>>> restoration, those splits should not appear, right?
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> 
>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>> 
>>>>>>>>> On Wed, Aug 6, 2025 at 5:19 AM Hongshun Wang <[email protected] 
>>>>>>>>> <mailto:[email protected]>> wrote:
>>>>>>>>>> Hi Becket,
>>>>>>>>>> 
>>>>>>>>>> Thank you a lot for your advice, which helped me a lot.
>>>>>>>>>> >  It seems that we don't need the method 
>>>>>>>>>> > `SourceReader.getAssignedSplits()`. The assigned splits are 
>>>>>>>>>> > available in the SourceCoordinator upon state restoration.
>>>>>>>>>> 
>>>>>>>>>>  SourceCoordinator doesn't store splits that have already been 
>>>>>>>>>> assigned to readers, and SplitAssignmentTracker stores the splits 
>>>>>>>>>> only for this checkpoint, which will be removed after checkpoint. 
>>>>>>>>>> Maybe you mean SourceOperator?
>>>>>>>>>> 
>>>>>>>>>> At the beginning, I also thought about using it. However, there are 
>>>>>>>>>> two situations:
>>>>>>>>>> 1. During restart, if source options remove a topic or table: 
>>>>>>>>>> sometimes connectors like MySQL CDC will remove unused splits after 
>>>>>>>>>> restart in MySqlSourceReader#addSplits [1]. Kafka lacks this, so if 
>>>>>>>>>> the configured topics change, removed topic's splits are still read. 
>>>>>>>>>> I also want to do the same thing in Kafka.
>>>>>>>>>> 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be 
>>>>>>>>>> removed after restart.
>>>>>>>>>> In these cases, I have to get the assigned splits after 
>>>>>>>>>> SourceReader#addSplits, rather than get them from SourceOperator 
>>>>>>>>>> directly.
>>>>>>>>>> 
>>>>>>>>>> >  By design, the SplitEnumerator can get the reader information any 
>>>>>>>>>> > time from the `SplitEnumeratorContext.registeredReaders()`.
>>>>>>>>>> It looks good.
>>>>>>>>>> 
>>>>>>>>>> Thanks again.
>>>>>>>>>> 
>>>>>>>>>> Best,
>>>>>>>>>> Hongshun
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> [1] 
>>>>>>>>>> https://github.com/apache/flink-cdc/blob/42f91a864e329c00959828fe0ca4f1e9e8e1de75/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java#L238
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Tue, Aug 5, 2025 at 2:35 PM Becket Qin <[email protected] 
>>>>>>>>>> <mailto:[email protected]>> wrote:
>>>>>>>>>>> Hi Hongshun,
>>>>>>>>>>> 
>>>>>>>>>>> Thanks for the proposal. The current Kafka split assignment 
>>>>>>>>>>> algorithm does seem to have issues. (I cannot recall why it was 
>>>>>>>>>>> implemented this way at that time...).
>>>>>>>>>>> 
>>>>>>>>>>> Two quick comments:
>>>>>>>>>>> 1. It seems that we don't need the method 
>>>>>>>>>>> `SourceReader.getAssignedSplits()`. The assigned splits are 
>>>>>>>>>>> available in the SourceCoordinator upon state restoration and can 
>>>>>>>>>>> be put into the ReaderRegistrationEvent.
>>>>>>>>>>> 2. Instead of adding the method `SplitEnumerator.addReader(int 
>>>>>>>>>>> subtaskId, List<SplitT> assignedSplits)`, add a new field of 
>>>>>>>>>>> `InitialSplitAssignment` to the ReaderInfo. By design, the 
>>>>>>>>>>> SplitEnumerator can get the reader information any time from the 
>>>>>>>>>>> `SplitEnumeratorContext.registeredReaders()`. This also avoids the 
>>>>>>>>>>> Enumerator implementation to remember the initially assigned 
>>>>>>>>>>> splits, if it wants to wait until all the readers are registered. 
>>>>>>>>>>> This also allow future addition of reader information.
>>>>>>>>>>> 
>>>>>>>>>>> Thanks,
>>>>>>>>>>> 
>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Mon, Aug 4, 2025 at 8:39 PM Hongshun Wang 
>>>>>>>>>>> <[email protected] <mailto:[email protected]>> wrote:
>>>>>>>>>>>> Anyone familiar with kafka connector can help review this FLIP? I 
>>>>>>>>>>>> am looking forward for your reply.
>>>>>>>>>>>> 
>>>>>>>>>>>> Best
>>>>>>>>>>>> Hongshun
>>>>>>>>>>>> 
>>>>>>>>>>>> On Thu, Jul 24, 2025 at 8:13 PM Leonard Xu <[email protected] 
>>>>>>>>>>>> <mailto:[email protected]>> wrote:
>>>>>>>>>>>>> Thanks Hongshun for driving this work.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> We also suffering the issue in production Kafka restoration 
>>>>>>>>>>>>> usage, current design is a nice tradeoff and has considered the 
>>>>>>>>>>>>> new Source implementation details, +1 from my side.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Leonard
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> > 2025 7月 19 18:59,Hongshun Wang <[email protected] 
>>>>>>>>>>>>> > <mailto:[email protected]>> 写道:
>>>>>>>>>>>>> > 
>>>>>>>>>>>>> > Hi devs,
>>>>>>>>>>>>> > 
>>>>>>>>>>>>> > I'd like to initiate a discussion about [FLIP-537: Enumerator 
>>>>>>>>>>>>> > with Global
>>>>>>>>>>>>> > Split Assignment Distribution for Balanced Split Assignment] 
>>>>>>>>>>>>> > [1], which
>>>>>>>>>>>>> > addresses critical limitations in our current Kafka connector 
>>>>>>>>>>>>> > split
>>>>>>>>>>>>> > distribution mechanism.
>>>>>>>>>>>>> > 
>>>>>>>>>>>>> > As documented in [FLINK-31762] [2], several scenarios currently 
>>>>>>>>>>>>> > lead to
>>>>>>>>>>>>> > uneven Kafka split distribution, causing reader delays and 
>>>>>>>>>>>>> > performance
>>>>>>>>>>>>> > bottlenecks. The core issue stems from the enumerator's lack of 
>>>>>>>>>>>>> > visibility
>>>>>>>>>>>>> > into post-assignment split distribution.
>>>>>>>>>>>>> > 
>>>>>>>>>>>>> > This flip does two things:
>>>>>>>>>>>>> > 1. ReaderRegistrationEvent Enhancement: SourceOperator should 
>>>>>>>>>>>>> > send
>>>>>>>>>>>>> > ReaderRegistrationEvent with assigned splits metadata after 
>>>>>>>>>>>>> > startup to
>>>>>>>>>>>>> > ensure state consistency.
>>>>>>>>>>>>> > 2. Implementation in the Kafka connector to resolve imbalanced 
>>>>>>>>>>>>> > splits and
>>>>>>>>>>>>> > state awareness during recovery (the enumerator will always 
>>>>>>>>>>>>> > choose the
>>>>>>>>>>>>> > least assigned subtask,and reason aslo as follows)
>>>>>>>>>>>>> > 
>>>>>>>>>>>>> > Any additional questions regarding this FLIP? Looking forward 
>>>>>>>>>>>>> > to hearing
>>>>>>>>>>>>> > from you.
>>>>>>>>>>>>> > 
>>>>>>>>>>>>> > Best
>>>>>>>>>>>>> > Hongshun
>>>>>>>>>>>>> > 
>>>>>>>>>>>>> > 
>>>>>>>>>>>>> > [1]
>>>>>>>>>>>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>>>>>>>>>>> > [2] https://issues.apache.org/jira/browse/FLINK-31762
>>>>>>>>>>>>> 
>>> 

Reply via email to