Thanks Hongshun and Becket for the deep discussion. I only have one comment for one API design:
> Deprecate the old addSplitsBack method, use a addSplitsBack with param > isReportedByReader instead. Because, The enumerator can apply different > reassignment policies based on the context. Could we introduce a new method like addSplitsBackOnRecovery with default implementation. In this way, we can provide better backward compatibility and also makes it easier for developers to understand. Best, Leonard > 2025 9月 3 20:26,Hongshun Wang <[email protected]> 写道: > > Hi Becket, > > I think that's a great idea! I have added the > SupportSplitReassignmentOnRecovery interface in this FLIP. If a Source > implements this interface indicates that the source operator needs to report > splits to the enumerator and receive reassignment.[1] > > Best, > Hongshun > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment > > On Thu, Aug 21, 2025 at 12:09 PM Becket Qin <[email protected] > <mailto:[email protected]>> wrote: >> Hi Hongshun, >> >> I think the convention for such optional features in Source is via mix-in >> interfaces. So instead of adding a method to the SourceReader, maybe we >> should introduce an interface SupportSplitReassingmentOnRecovery with this >> method. If a Source implementation implements that interface, then the >> SourceOperator will check the desired behavior and act accordingly. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang <[email protected] >> <mailto:[email protected]>> wrote: >>> Hi de vs, >>> >>> Would anyone like to discuss this FLIP? I'd appreciate your feedback and >>> suggestions. >>> >>> Best, >>> Hongshun >>> >>> >>>> 2025年8月13日 14:23,Hongshun Wang <[email protected] >>>> <mailto:[email protected]>> 写道: >>>> >>>> Hi Becket, >>>> Thank you for your detailed feedback. The new contract makes good sense to >>>> me and effectively addresses the issues I encountered at the beginning of >>>> the design. >>>> That said, I recommend not reporting splits by default, primarily for >>>> compatibility and practical reasons: >>>> > For these reasons, we do not expect the Split objects to be huge, and >>>> > we are not trying to design for huge Split objects either as they will >>>> > have problems even today. >>>> Not all existing connector match this rule >>>> For example, in mysql cdc connector, a binlog split may contain hundreds >>>> (or even more) snapshot split completion records. This state is large and >>>> is currently transmitted incrementally through multiple >>>> BinlogSplitMetaEvent messages. Since the binlog reader operates with >>>> single parallelism, reporting the full split state on recovery could be >>>> inefficient or even infeasible. >>>> For such sources, it would be better to provide a mechanism to skip split >>>> reporting during restart until they redesign and reduce the split size. >>>> Not all enumerators maintain unassigned splits in state. >>>> Some SplitEnumerator(such as kafka connector) implementations do not track >>>> or persistently manage unassigned splits. Requiring them to handle >>>> re-registration would add unnecessary complexity. Even though we maybe >>>> implements in kafka connector, currently, kafka connector is decouple with >>>> flink version, we also need to make sure the elder version is compatible. >>>> To address these concerns, I propose introducing a new method: boolean >>>> SourceReader#shouldReassignSplitsOnRecovery() with a default >>>> implementation returning false. This allows source readers to opt in to >>>> split reassignment only when necessary. Since the new contract already >>>> places the responsibility for split assignment on the enumerator, not >>>> reporting splits by default is a safe and clean default behavior. >>>> >>>> >>>> I’ve updated the implementation and the FIP accordingly[1]. It quite a big >>>> change. In particular, for the Kafka connector, we can now use a pluggable >>>> SplitPartitioner to support different split assignment strategies (e.g., >>>> default, round-robin). >>>> >>>> Could you please review it when you have a chance? >>>> >>>> Best, >>>> Hongshun >>>> >>>> [1] >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>> >>>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <[email protected] >>>> <mailto:[email protected]>> wrote: >>>>> Hi Hongshun, >>>>> >>>>> I am not too concerned about the transmission cost. Because the full >>>>> split transmission has to happen in the initial assignment phase already. >>>>> And in the future, we probably want to also introduce some kind of >>>>> workload balance across source readers, e.g. based on the per-split >>>>> throughput or the per-source-reader workload in heterogeneous clusters. >>>>> For these reasons, we do not expect the Split objects to be huge, and we >>>>> are not trying to design for huge Split objects either as they will have >>>>> problems even today. >>>>> >>>>> Good point on the potential split loss, please see the reply below: >>>>> >>>>>> Scenario 2: >>>>>> >>>>>> 1. Reader A reports splits (1 and 2), and Reader B reports (3 and 4) >>>>>> upon restart. >>>>>> 2. Before the enumerator receives all reports and performs reassignment, >>>>>> a checkpoint is triggered. >>>>>> 3. Since no splits have been reassigned yet, both readers have empty >>>>>> states. >>>>>> 4. When restarting from this checkpoint, all four splits are lost. >>>>> The reader registration happens in the SourceOperator.open(), which means >>>>> the task is still in the initializing state, therefore the checkpoint >>>>> should not be triggered until the enumerator receives all the split >>>>> reports. >>>>> >>>>> There is a nuance here. Today, the RPC call from the TM to the JM is >>>>> async. So it is possible that the SourceOpertor.open() has returned, but >>>>> the enumerator has not received the split reports. However, because the >>>>> task status update RPC call goes to the same channel as the split reports >>>>> call, so the task status RPC call will happen after the split reports >>>>> call on the JM side. Therefore, on the JM side, the SourceCoordinator >>>>> will always first receive the split reports, then receive the checkpoint >>>>> request. >>>>> This "happen before" relationship is kind of important to guarantee the >>>>> consistent state between enumerator and readers. >>>>> >>>>>> Scenario 1: >>>>>> >>>>>> 1. Upon restart, Reader A reports assigned splits (1 and 2), and Reader >>>>>> B reports (3 and 4). >>>>>> 2. The enumerator receives these reports but only reassigns splits 1 and >>>>>> 2 — not 3 and 4. >>>>>> 3. A checkpoint or savepoint is then triggered. Only splits 1 and 2 are >>>>>> recorded in the reader states; splits 3 and 4 are not persisted. >>>>>> 4. If the job is later restarted from this checkpoint, splits 3 and 4 >>>>>> will be permanently lost. >>>>> This scenario is possible. One solution is to let the enumerator >>>>> implementation handle this. That means if the enumerator relies on the >>>>> initial split reports from the source readers, it should maintain these >>>>> reports by itself. In the above example, the enumerator will need to >>>>> remember that 3 and 4 are not assigned and put it into its own state. >>>>> The current contract is that anything assigned to the SourceReaders are >>>>> completely owned by the SourceReaders. Enumerators can remember the >>>>> assignments but cannot change them, even when the source reader recovers >>>>> / restarts. >>>>> With this FLIP, the contract becomes that the source readers will return >>>>> the ownership of the splits to the enumerator. So the enumerator is >>>>> responsible for maintaining these splits, until they are assigned to a >>>>> source reader again. >>>>> >>>>> There are other cases where there may be conflict information between >>>>> reader and enumerator. For example, consider the following sequence: >>>>> 1. reader A reports splits (1 and 2) up on restart. >>>>> 2. enumerator receives the report and assigns both 1 and 2 to reader B. >>>>> 3. reader A failed before checkpointing. And this is a partial failure, >>>>> so only reader A restarts. >>>>> 4. When reader A recovers, it will again report splits (1 and 2) to the >>>>> enumerator. >>>>> 5. The enumerator should ignore this report because it has assigned >>>>> splits (1 and 2) to reader B. >>>>> >>>>> So with the new contract, the enumerator should be the source of truth >>>>> for split ownership. >>>>> >>>>> Thanks, >>>>> >>>>> Jiangjie (Becket) Qin >>>>> >>>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang <[email protected] >>>>> <mailto:[email protected]>> wrote: >>>>>> Hi Becket, >>>>>> >>>>>> I did consider this approach at the beginning (and it was also mentioned >>>>>> in this FLIP), since it would allow more flexibility in reassigning all >>>>>> splits. However, there are a few potential issues. >>>>>> 1. High Transmission Cost >>>>>> If we pass the full split objects (rather than just split IDs), the data >>>>>> size could be significant, leading to high overhead during transmission >>>>>> — especially when many splits are involved. >>>>>> 2. Risk of Split Loss >>>>>> Risk of split loss exists unless we have a mechanism to make sure only >>>>>> can checkpoint after all the splits are reassigned. >>>>>> There are scenarios where splits could be lost due to inconsistent state >>>>>> handling during recovery: >>>>>> >>>>>> Scenario 1: >>>>>> >>>>>> Upon restart, Reader A reports assigned splits (1 and 2), and Reader B >>>>>> reports (3 and 4). >>>>>> The enumerator receives these reports but only reassigns splits 1 and 2 >>>>>> — not 3 and 4. >>>>>> A checkpoint or savepoint is then triggered. Only splits 1 and 2 are >>>>>> recorded in the reader states; splits 3 and 4 are not persisted. >>>>>> If the job is later restarted from this checkpoint, splits 3 and 4 will >>>>>> be permanently lost. >>>>>> >>>>>> Scenario 2: >>>>>> Reader A reports splits (1 and 2), and Reader B reports (3 and 4) upon >>>>>> restart. >>>>>> Before the enumerator receives all reports and performs reassignment, a >>>>>> checkpoint is triggered. >>>>>> Since no splits have been reassigned yet, both readers have empty states. >>>>>> When restarting from this checkpoint, all four splits are lost. >>>>>> >>>>>> Let me know if you have thoughts on how we might mitigate these risks! >>>>>> >>>>>> Best >>>>>> Hongshun >>>>>> >>>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <[email protected] >>>>>> <mailto:[email protected]>> wrote: >>>>>>> Hi Hongshun, >>>>>>> >>>>>>> The steps sound reasonable to me in general. In terms of the updated >>>>>>> FLIP wiki, it would be good to see if we can keep the protocol simple. >>>>>>> One alternative way to achieve this behavior is following: >>>>>>> >>>>>>> 1. Upon SourceOperator startup, the SourceOperator sends >>>>>>> ReaderRegistrationEvent with the currently assigned splits to the >>>>>>> enumerator. It does not add these splits to the SourceReader. >>>>>>> 2. The enumerator will always use the >>>>>>> SourceEnumeratorContext.assignSplits() to assign the splits. (not via >>>>>>> the response of the SourceRegistrationEvent, this allows async split >>>>>>> assignment in case the enumerator wants to wait until all the readers >>>>>>> are registered) >>>>>>> 3. The SourceOperator will only call SourceReader.addSplits() when it >>>>>>> receives the AddSplitEvent from the enumerator. >>>>>>> >>>>>>> This protocol has a few benefits: >>>>>>> 1. it basically allows arbitrary split reassignment upon restart >>>>>>> 2. simplicity: there is only one way to assign splits. >>>>>>> >>>>>>> So we only need one interface change: >>>>>>> - add the initially assigned splits to ReaderInfo so the Enumerator can >>>>>>> access it. >>>>>>> and one behavior change: >>>>>>> - The SourceOperator should stop assigning splits to the from state >>>>>>> restoration, but only do that when it receives AddSplitsEvent from the >>>>>>> enumerator. >>>>>>> >>>>>>> The enumerator story is also simple: >>>>>>> 1. Receive some kind of notification (new partition, new reader, etc) >>>>>>> 2. look at the reader information (in the enumerator context or >>>>>>> self-maintained state) >>>>>>> 3. assign splits via the enumerator context. >>>>>>> >>>>>>> Thanks, >>>>>>> >>>>>>> Jiangjie (Becket) Qin >>>>>>> >>>>>>> On Thu, Aug 7, 2025 at 1:31 AM Hongshun Wang <[email protected] >>>>>>> <mailto:[email protected]>> wrote: >>>>>>>> Hi Becket, >>>>>>>> Thanks for your advice — I’ve quickly learned a lot about the reader’s >>>>>>>> design principle. It’s really interesting! >>>>>>>> >>>>>>>> > One principle we want to follow is that the enumerator should be the >>>>>>>> > brain doing the splits assignment, while the source readers read >>>>>>>> > from the assigned splits. So we want to avoid the case where the >>>>>>>> > SourceReader ignores the split assignment. >>>>>>>> >>>>>>>> It appears that MySQL CDC currently bypasses this principle by >>>>>>>> proactively removing unused splits directly in the SourceReader. This >>>>>>>> may be due to the lack of built-in framework support for such cleanup, >>>>>>>> forcing connectors to handle it manually. However, this responsibility >>>>>>>> ideally belongs in the framework. >>>>>>>> >>>>>>>> With this FLIP, we propose a redesigned mechanism that centralizes >>>>>>>> split cleanup logic in the SplitEnumerator, allowing connectors like >>>>>>>> MySQL CDC to eventually adopt it( @leneord, CC). >>>>>>>> >>>>>>>> To achieve this, we must carefully manage state consistency during >>>>>>>> startup and recovery. The proposed approach is as follows: >>>>>>>> Reader Registration with Deferred Assignment >>>>>>>> When a reader starts (SourceOperator#open), it sends a >>>>>>>> ReaderRegistrationEvent to the SplitEnumerator, including its >>>>>>>> previously assigned splits (restored from state). However, these >>>>>>>> splits are not yet assigned to the reader. The SourceOperator is >>>>>>>> placed in a PENDING state. >>>>>>>> Prevent State Pollution During Registration >>>>>>>> While in the PENDING state, SourceOperator#snapshotState will not >>>>>>>> update the operator state. This prevents empty or outdated reader >>>>>>>> state (e.g., with removed splits) from polluting the checkpoint. >>>>>>>> Enumerator Performs Split Cleanup and Acknowledges >>>>>>>> Upon receiving the ReaderRegistrationEvent, the SplitEnumerator >>>>>>>> removes any splits that are no longer valid (e.g., due to removed >>>>>>>> topics or tables) and returns the list of remaining valid split IDs to >>>>>>>> the reader via a ReaderRegistrationACKEvent. >>>>>>>> For backward compatibility, the default behavior is to return all >>>>>>>> split IDs (i.e., no filtering). >>>>>>>> Finalize Registration and Resume Normal Operation >>>>>>>> When the SourceOperator receives the ReaderRegistrationACKEvent, it >>>>>>>> assigns the confirmed splits to the reader and transitions its state >>>>>>>> to REGISTERED. From this point onward, SourceOperator#snapshotState >>>>>>>> can safely update the operator state. >>>>>>>> >>>>>>>> Best, >>>>>>>> Hongshun >>>>>>>> >>>>>>>> On Thu, Aug 7, 2025 at 1:57 AM Becket Qin <[email protected] >>>>>>>> <mailto:[email protected]>> wrote: >>>>>>>>>> SourceCoordinator doesn't store splits that have already been >>>>>>>>>> assigned to readers, and SplitAssignmentTracker stores the splits >>>>>>>>>> only for this checkpoint, which will be removed after checkpoint. >>>>>>>>>> Maybe you mean SourceOperator? >>>>>>>>> Yes, I meant SourceOperator. >>>>>>>>> >>>>>>>>>> At the beginning, I also thought about using it. However, there are >>>>>>>>>> two situations: >>>>>>>>>> 1. During restart, if source options remove a topic or table: >>>>>>>>>> sometimes connectors like MySQL CDC will remove unused splits after >>>>>>>>>> restart in MySqlSourceReader#addSplits [1]. Kafka lacks this, so if >>>>>>>>>> the configured topics change, removed topic's splits are still read. >>>>>>>>>> I also want to do the same thing in Kafka. >>>>>>>>>> 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be >>>>>>>>>> removed after restart. >>>>>>>>>> In these cases, I have to get the assigned splits after >>>>>>>>>> SourceReader#addSplits, rather than get them from SourceOperator >>>>>>>>>> directly. >>>>>>>>> >>>>>>>>> One principle we want to follow is that the enumerator should be the >>>>>>>>> brain doing the splits assignment, while the source readers read from >>>>>>>>> the assigned splits. So we want to avoid the case where the >>>>>>>>> SourceReader ignores the split assignment. Given this principle, >>>>>>>>> For case 1, if there is a subscription change, it might be better to >>>>>>>>> hold back calling SourceReader.addSplits() until an assignment is >>>>>>>>> confirmed by the Enumerator. In fact, this might be a good default >>>>>>>>> behavior regardless of whether there is a subscription change. >>>>>>>>> For case 2: if a bounded split is finished, the >>>>>>>>> SourceReader.snapshotState() will not contain that split. So upon >>>>>>>>> restoration, those splits should not appear, right? >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> >>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>> >>>>>>>>> On Wed, Aug 6, 2025 at 5:19 AM Hongshun Wang <[email protected] >>>>>>>>> <mailto:[email protected]>> wrote: >>>>>>>>>> Hi Becket, >>>>>>>>>> >>>>>>>>>> Thank you a lot for your advice, which helped me a lot. >>>>>>>>>> > It seems that we don't need the method >>>>>>>>>> > `SourceReader.getAssignedSplits()`. The assigned splits are >>>>>>>>>> > available in the SourceCoordinator upon state restoration. >>>>>>>>>> >>>>>>>>>> SourceCoordinator doesn't store splits that have already been >>>>>>>>>> assigned to readers, and SplitAssignmentTracker stores the splits >>>>>>>>>> only for this checkpoint, which will be removed after checkpoint. >>>>>>>>>> Maybe you mean SourceOperator? >>>>>>>>>> >>>>>>>>>> At the beginning, I also thought about using it. However, there are >>>>>>>>>> two situations: >>>>>>>>>> 1. During restart, if source options remove a topic or table: >>>>>>>>>> sometimes connectors like MySQL CDC will remove unused splits after >>>>>>>>>> restart in MySqlSourceReader#addSplits [1]. Kafka lacks this, so if >>>>>>>>>> the configured topics change, removed topic's splits are still read. >>>>>>>>>> I also want to do the same thing in Kafka. >>>>>>>>>> 2. In Kafka or MySQL CDC, some bounded splits, if finished, can be >>>>>>>>>> removed after restart. >>>>>>>>>> In these cases, I have to get the assigned splits after >>>>>>>>>> SourceReader#addSplits, rather than get them from SourceOperator >>>>>>>>>> directly. >>>>>>>>>> >>>>>>>>>> > By design, the SplitEnumerator can get the reader information any >>>>>>>>>> > time from the `SplitEnumeratorContext.registeredReaders()`. >>>>>>>>>> It looks good. >>>>>>>>>> >>>>>>>>>> Thanks again. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Hongshun >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> https://github.com/apache/flink-cdc/blob/42f91a864e329c00959828fe0ca4f1e9e8e1de75/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReader.java#L238 >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Aug 5, 2025 at 2:35 PM Becket Qin <[email protected] >>>>>>>>>> <mailto:[email protected]>> wrote: >>>>>>>>>>> Hi Hongshun, >>>>>>>>>>> >>>>>>>>>>> Thanks for the proposal. The current Kafka split assignment >>>>>>>>>>> algorithm does seem to have issues. (I cannot recall why it was >>>>>>>>>>> implemented this way at that time...). >>>>>>>>>>> >>>>>>>>>>> Two quick comments: >>>>>>>>>>> 1. It seems that we don't need the method >>>>>>>>>>> `SourceReader.getAssignedSplits()`. The assigned splits are >>>>>>>>>>> available in the SourceCoordinator upon state restoration and can >>>>>>>>>>> be put into the ReaderRegistrationEvent. >>>>>>>>>>> 2. Instead of adding the method `SplitEnumerator.addReader(int >>>>>>>>>>> subtaskId, List<SplitT> assignedSplits)`, add a new field of >>>>>>>>>>> `InitialSplitAssignment` to the ReaderInfo. By design, the >>>>>>>>>>> SplitEnumerator can get the reader information any time from the >>>>>>>>>>> `SplitEnumeratorContext.registeredReaders()`. This also avoids the >>>>>>>>>>> Enumerator implementation to remember the initially assigned >>>>>>>>>>> splits, if it wants to wait until all the readers are registered. >>>>>>>>>>> This also allow future addition of reader information. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> >>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Mon, Aug 4, 2025 at 8:39 PM Hongshun Wang >>>>>>>>>>> <[email protected] <mailto:[email protected]>> wrote: >>>>>>>>>>>> Anyone familiar with kafka connector can help review this FLIP? I >>>>>>>>>>>> am looking forward for your reply. >>>>>>>>>>>> >>>>>>>>>>>> Best >>>>>>>>>>>> Hongshun >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Jul 24, 2025 at 8:13 PM Leonard Xu <[email protected] >>>>>>>>>>>> <mailto:[email protected]>> wrote: >>>>>>>>>>>>> Thanks Hongshun for driving this work. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> We also suffering the issue in production Kafka restoration >>>>>>>>>>>>> usage, current design is a nice tradeoff and has considered the >>>>>>>>>>>>> new Source implementation details, +1 from my side. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> Leonard >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> > 2025 7月 19 18:59,Hongshun Wang <[email protected] >>>>>>>>>>>>> > <mailto:[email protected]>> 写道: >>>>>>>>>>>>> > >>>>>>>>>>>>> > Hi devs, >>>>>>>>>>>>> > >>>>>>>>>>>>> > I'd like to initiate a discussion about [FLIP-537: Enumerator >>>>>>>>>>>>> > with Global >>>>>>>>>>>>> > Split Assignment Distribution for Balanced Split Assignment] >>>>>>>>>>>>> > [1], which >>>>>>>>>>>>> > addresses critical limitations in our current Kafka connector >>>>>>>>>>>>> > split >>>>>>>>>>>>> > distribution mechanism. >>>>>>>>>>>>> > >>>>>>>>>>>>> > As documented in [FLINK-31762] [2], several scenarios currently >>>>>>>>>>>>> > lead to >>>>>>>>>>>>> > uneven Kafka split distribution, causing reader delays and >>>>>>>>>>>>> > performance >>>>>>>>>>>>> > bottlenecks. The core issue stems from the enumerator's lack of >>>>>>>>>>>>> > visibility >>>>>>>>>>>>> > into post-assignment split distribution. >>>>>>>>>>>>> > >>>>>>>>>>>>> > This flip does two things: >>>>>>>>>>>>> > 1. ReaderRegistrationEvent Enhancement: SourceOperator should >>>>>>>>>>>>> > send >>>>>>>>>>>>> > ReaderRegistrationEvent with assigned splits metadata after >>>>>>>>>>>>> > startup to >>>>>>>>>>>>> > ensure state consistency. >>>>>>>>>>>>> > 2. Implementation in the Kafka connector to resolve imbalanced >>>>>>>>>>>>> > splits and >>>>>>>>>>>>> > state awareness during recovery (the enumerator will always >>>>>>>>>>>>> > choose the >>>>>>>>>>>>> > least assigned subtask,and reason aslo as follows) >>>>>>>>>>>>> > >>>>>>>>>>>>> > Any additional questions regarding this FLIP? Looking forward >>>>>>>>>>>>> > to hearing >>>>>>>>>>>>> > from you. >>>>>>>>>>>>> > >>>>>>>>>>>>> > Best >>>>>>>>>>>>> > Hongshun >>>>>>>>>>>>> > >>>>>>>>>>>>> > >>>>>>>>>>>>> > [1] >>>>>>>>>>>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>>>>>>>>>>> > [2] https://issues.apache.org/jira/browse/FLINK-31762 >>>>>>>>>>>>> >>>
