Hi Becket and Leonard, Thanks for your advice.
> put all the reader information in the SplitEnumerator context I have a concern: the current registeredReaders in* SourceCoordinatorContext will be removed after subtaskResetor execution on failure*.However, this approach has merit. One more situation I found my previous design does not cover: 1. Initial state: Reader A reports splits (1, 2). 2. Enumerator action: Assigns split 1 to Reader A, and split 2 to Reader B. 3. Failure scenario: Reader A fails before checkpointing. Since this is a partial failure, only Reader A restarts. 4. Recovery issue: Upon recovery, Reader A re-reports split (1). In my previous design, the enumerator will ignore Reader A's re-registration which will cause data loss. Thus, when the enumerator receives a split, the split may originate from three scenarios: 1. The split is reported by a reader during a global restoration. 2. The split is reported by a reader during a partial failure recovery. 3. The split is not reported by a reader, but is assigned after the last successful checkpoint and was never acknowledged. In the first scenario (global restore), the split should be re-distributed. For the latter two scenarios (partial failover and post-checkpoint assignment), we need to reassign the split to its originally assigned subtask. By implementing a method in the SplitEnumerator context to track each assigned split's status, the system can correctly identify and resolve split ownership in all three scenarios.*What about adding a `SplitRecoveryType splitRecoveryType(Split split)` in SplitEnumeratorContext.* SplitRecoveryTypeis a enum including `UNASSIGNED`、`GLOBAL_RESTORE`、`PARTIAL_FAILOVER` and `POST_CHECKPOINT_ASSIGNMENT`. What do you think? Are there any details or scenarios I haven't considered? Looking forward to your advice. Best, Hongshun On Thu, Sep 11, 2025 at 12:41 AM Becket Qin <[email protected]> wrote: > Thanks for the explanation, Hongshun. > > Current pattern of handling new reader registration following: > 1. put all the reader information in the SplitEnumerator context > 2. notify the enumerator about the new reader registration. > 3. Let the split enumerator get whatever information it wants from the > context and do its job. > This pattern decouples the information passing and the reader registration > notification. This makes the API extensible - we can add more information > (e.g. reported assigned splits in our case) about the reader to the context > without introducing new methods. > > Introducing a new method of addSplitBackOnRecovery() is redundant to the > above pattern. Do we really need it? > > Thanks, > > Jiangjie (Becket) Qin > > On Mon, Sep 8, 2025 at 8:18 PM Hongshun Wang <[email protected]> > wrote: > > > Hi Becket, > > > > > I am curious what would the enumerator do differently for the splits > > added via addSplitsBackOnRecovery() V.S. addSplitsBack()? > > > > In this FLIP, there are two distinct scenarios in which the enumerator > > receives splits being added back: > > 1. Job-level restore: The job is restored, splits from reader’s state > are > > reported by ReaderRegistrationEvent. > > 2. Reader-level restart: a reader is started but not the whole job, > > splits assigned to it after the last successful checkpoint. This is what > > addSplitsBack used to do. > > > > > > In these two situations, the enumerator will choose different strategies. > > 1. Job-level restore: the splits should be redistributed across readers > > according to the current partitioner strategy. > > 2. Reader-level restart: the splits should be reassigned directly back to > > the same reader they were originally assigned to, preserving locality and > > avoiding unnecessary redistribution > > > > Therefore, the enumerator must clearly distinguish between these two > > scenarios.I used to deprecate the former addSplitsBack(List<SplitT> > > splits, int subtaskId) but add a new addSplitsBack(List<SplitT> > > splits, int subtaskId, > > boolean reportedByReader). > > Leonard suggest to use another method addSplitsBackOnRecovery but not > > influenced currently addSplitsBack. > > > > Best > > Hongshun > > > > > > > > On 2025/09/08 17:20:31 Becket Qin wrote: > > > Hi Leonard, > > > > > > > > > > Could we introduce a new method like addSplitsBackOnRecovery with > > default > > > > implementation. In this way, we can provide better backward > > compatibility > > > > and also makes it easier for developers to understand. > > > > > > > > > I am curious what would the enumerator do differently for the splits > > added > > > via addSplitsBackOnRecovery() V.S. addSplitsBack()? Today, > > addSplitsBack() > > > is also only called upon recovery. So the new method seems confusing. > One > > > thing worth clarifying is if the Source implements > > > SupportSplitReassignmentOnRecovery, upon recovery, should the splits > > > reported by the readers also be added back to the SplitEnumerator via > the > > > addSplitsBack() call? Or should the SplitEnumerator explicitly query > the > > > registered reader information via the SplitEnumeratorContext to get the > > > originally assigned splits when addReader() is invoked? I was assuming > > the > > > latter in the beginning, so the behavior of addSplitsBack() remains > > > unchanged, but I am not opposed in doing the former. > > > > > > Also, can you elaborate on the backwards compatibility issue you see if > > we > > > do not have a separate addSplitsBackOnRecovery() method? Even without > > this > > > new method, the behavior remains exactly the same unless the end users > > > implement the mix-in interface of "SupportSplitReassignmentOnRecovery", > > > right? > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > On Mon, Sep 8, 2025 at 1:48 AM Hongshun Wang <[email protected]> > > > wrote: > > > > > > > Hi devs, > > > > > > > > It has been quite some time since this FLIP[1] was first proposed. > > Thank > > > > you for your valuable feedback—based on your suggestions, the FLIP > has > > > > undergone several rounds of revisions. > > > > > > > > Any more advice is welcome and appreciated. If there are no further > > > > concerns, I plan to start the vote tomorrow. > > > > > > > > Best > > > > Hongshun > > > > > > > > [1] > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480 > > > > > > > > On Mon, Sep 8, 2025 at 4:42 PM Hongshun Wang <[email protected]> > > > > wrote: > > > > > > > > > Hi Leonard, > > > > > Thanks for your advice. It makes sense and I have modified it. > > > > > > > > > > Best, > > > > > Hongshun > > > > > > > > > > On Mon, Sep 8, 2025 at 11:40 AM Leonard Xu <[email protected]> > wrote: > > > > > > > > > >> Thanks Hongshun and Becket for the deep discussion. > > > > >> > > > > >> I only have one comment for one API design: > > > > >> > > > > >> > Deprecate the old addSplitsBack method, use a addSplitsBack > with > > > > >> param isReportedByReader instead. Because, The enumerator can > apply > > > > >> different reassignment policies based on the context. > > > > >> > > > > >> Could we introduce a new method like *addSplitsBackOnRecovery* > with > > > > default > > > > >> implementation. In this way, we can provide better backward > > > > >> compatibility and also makes it easier for developers to > understand. > > > > >> > > > > >> Best, > > > > >> Leonard > > > > >> > > > > >> > > > > >> > > > > >> 2025 9月 3 20:26,Hongshun Wang <[email protected]> 写道: > > > > >> > > > > >> Hi Becket, > > > > >> > > > > >> I think that's a great idea! I have added the > > > > >> SupportSplitReassignmentOnRecovery interface in this FLIP. If a > > Source > > > > >> implements this interface indicates that the source operator needs > > to > > > > >> report splits to the enumerator and receive reassignment.[1] > > > > >> > > > > >> Best, > > > > >> Hongshun > > > > >> > > > > >> [1] > > > > >> > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment > > > > >> > > > > >> On Thu, Aug 21, 2025 at 12:09 PM Becket Qin <[email protected]> > > > > wrote: > > > > >> > > > > >>> Hi Hongshun, > > > > >>> > > > > >>> I think the convention for such optional features in Source is > via > > > > >>> mix-in interfaces. So instead of adding a method to the > > SourceReader, > > > > maybe > > > > >>> we should introduce an interface > SupportSplitReassingmentOnRecovery > > > > with > > > > >>> this method. If a Source implementation implements that > interface, > > > > then the > > > > >>> SourceOperator will check the desired behavior and act > accordingly. > > > > >>> > > > > >>> Thanks, > > > > >>> > > > > >>> Jiangjie (Becket) Qin > > > > >>> > > > > >>> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang < > > [email protected] > > > > > > > > > >>> wrote: > > > > >>> > > > > >>>> Hi de vs, > > > > >>>> > > > > >>>> Would anyone like to discuss this FLIP? I'd appreciate your > > feedback > > > > >>>> and suggestions. > > > > >>>> > > > > >>>> Best, > > > > >>>> Hongshun > > > > >>>> > > > > >>>> > > > > >>>> 2025年8月13日 14:23,Hongshun Wang <[email protected]> 写道: > > > > >>>> > > > > >>>> Hi Becket, > > > > >>>> > > > > >>>> Thank you for your detailed feedback. The new contract makes > good > > > > sense > > > > >>>> to me and effectively addresses the issues I encountered at the > > > > beginning > > > > >>>> of the design. > > > > >>>> > > > > >>>> That said, I recommend not reporting splits by default, > primarily > > for > > > > >>>> compatibility and practical reasons: > > > > >>>> > > > > >>>> > For these reasons, we do not expect the Split objects to be > > huge, > > > > >>>> and we are not trying to design for huge Split objects either as > > they > > > > will > > > > >>>> have problems even today. > > > > >>>> > > > > >>>> 1. > > > > >>>> > > > > >>>> Not all existing connector match this rule > > > > >>>> For example, in mysql cdc connector, a binlog split may > contain > > > > >>>> hundreds (or even more) snapshot split completion records. > This > > > > state is > > > > >>>> large and is currently transmitted incrementally through > > multiple > > > > >>>> BinlogSplitMetaEvent messages. Since the binlog reader > operates > > > > >>>> with single parallelism, reporting the full split state on > > recovery > > > > >>>> could be inefficient or even infeasible. > > > > >>>> For such sources, it would be better to provide a mechanism > to > > skip > > > > >>>> split reporting during restart until they redesign and reduce > > the > > > > >>>> split size. > > > > >>>> 2. > > > > >>>> > > > > >>>> Not all enumerators maintain unassigned splits in state. > > > > >>>> Some SplitEnumerator(such as kafka connector) implementations > > do > > > > >>>> not track or persistently manage unassigned splits. Requiring > > them > > > > to > > > > >>>> handle re-registration would add unnecessary complexity. Even > > > > though we > > > > >>>> maybe implements in kafka connector, currently, kafka > connector > > is > > > > decouple > > > > >>>> with flink version, we also need to make sure the elder > version > > is > > > > >>>> compatible. > > > > >>>> > > > > >>>> ------------------------------ > > > > >>>> > > > > >>>> To address these concerns, I propose introducing a new method: > > boolean > > > > >>>> SourceReader#shouldReassignSplitsOnRecovery() with a default > > > > >>>> implementation returning false. This allows source readers to > opt > > in > > > > >>>> to split reassignment only when necessary. Since the new > contract > > > > already > > > > >>>> places the responsibility for split assignment on the > enumerator, > > not > > > > >>>> reporting splits by default is a safe and clean default > behavior. > > > > >>>> > > > > >>>> > > > > >>>> ------------------------------ > > > > >>>> > > > > >>>> > > > > >>>> I’ve updated the implementation and the FIP accordingly[1]. It > > quite a > > > > >>>> big change. In particular, for the Kafka connector, we can now > use > > a > > > > >>>> pluggable SplitPartitioner to support different split assignment > > > > >>>> strategies (e.g., default, round-robin). > > > > >>>> > > > > >>>> > > > > >>>> Could you please review it when you have a chance? > > > > >>>> > > > > >>>> > > > > >>>> Best, > > > > >>>> > > > > >>>> Hongshun > > > > >>>> > > > > >>>> > > > > >>>> [1] > > > > >>>> > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment > > > > >>>> > > > > >>>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <[email protected]> > > > > wrote: > > > > >>>> > > > > >>>>> Hi Hongshun, > > > > >>>>> > > > > >>>>> I am not too concerned about the transmission cost. Because the > > full > > > > >>>>> split transmission has to happen in the initial assignment > phase > > > > already. > > > > >>>>> And in the future, we probably want to also introduce some kind > > of > > > > workload > > > > >>>>> balance across source readers, e.g. based on the per-split > > > > throughput or > > > > >>>>> the per-source-reader workload in heterogeneous clusters. For > > these > > > > >>>>> reasons, we do not expect the Split objects to be huge, and we > > are > > > > not > > > > >>>>> trying to design for huge Split objects either as they will > have > > > > problems > > > > >>>>> even today. > > > > >>>>> > > > > >>>>> Good point on the potential split loss, please see the reply > > below: > > > > >>>>> > > > > >>>>> Scenario 2: > > > > >>>>> > > > > >>>>> > > > > >>>>>> 1. Reader A reports splits (1 and 2), and Reader B reports (3 > > and 4) > > > > >>>>>> upon restart. > > > > >>>>>> 2. Before the enumerator receives all reports and performs > > > > >>>>>> reassignment, a checkpoint is triggered. > > > > >>>>>> 3. Since no splits have been reassigned yet, both readers have > > empty > > > > >>>>>> states. > > > > >>>>>> 4. When restarting from this checkpoint, all four splits are > > lost. > > > > >>>>> > > > > >>>>> The reader registration happens in the SourceOperator.open(), > > which > > > > >>>>> means the task is still in the initializing state, therefore > the > > > > checkpoint > > > > >>>>> should not be triggered until the enumerator receives all the > > split > > > > reports. > > > > >>>>> > > > > >>>>> There is a nuance here. Today, the RPC call from the TM to the > JM > > is > > > > >>>>> async. So it is possible that the SourceOpertor.open() has > > returned, > > > > but > > > > >>>>> the enumerator has not received the split reports. However, > > because > > > > the > > > > >>>>> task status update RPC call goes to the same channel as the > split > > > > reports > > > > >>>>> call, so the task status RPC call will happen after the split > > > > reports call > > > > >>>>> on the JM side. Therefore, on the JM side, the > SourceCoordinator > > will > > > > >>>>> always first receive the split reports, then receive the > > checkpoint > > > > request. > > > > >>>>> This "happen before" relationship is kind of important to > > guarantee > > > > >>>>> the consistent state between enumerator and readers. > > > > >>>>> > > > > >>>>> Scenario 1: > > > > >>>>> > > > > >>>>> > > > > >>>>>> 1. Upon restart, Reader A reports assigned splits (1 and 2), > and > > > > >>>>>> Reader B reports (3 and 4). > > > > >>>>>> 2. The enumerator receives these reports but only reassigns > > splits 1 > > > > >>>>>> and 2 — not 3 and 4. > > > > >>>>>> 3. A checkpoint or savepoint is then triggered. Only splits 1 > > and 2 > > > > >>>>>> are recorded in the reader states; splits 3 and 4 are not > > persisted. > > > > >>>>>> 4. If the job is later restarted from this checkpoint, splits > 3 > > and > > > > 4 > > > > >>>>>> will be permanently lost. > > > > >>>>> > > > > >>>>> This scenario is possible. One solution is to let the > enumerator > > > > >>>>> implementation handle this. That means if the enumerator relies > > on > > > > the > > > > >>>>> initial split reports from the source readers, it should > maintain > > > > these > > > > >>>>> reports by itself. In the above example, the enumerator will > need > > to > > > > >>>>> remember that 3 and 4 are not assigned and put it into its own > > state. > > > > >>>>> The current contract is that anything assigned to the > > SourceReaders > > > > >>>>> are completely owned by the SourceReaders. Enumerators can > > remember > > > > the > > > > >>>>> assignments but cannot change them, even when the source reader > > > > recovers / > > > > >>>>> restarts. > > > > >>>>> With this FLIP, the contract becomes that the source readers > will > > > > >>>>> return the ownership of the splits to the enumerator. So the > > > > enumerator is > > > > >>>>> responsible for maintaining these splits, until they are > assigned > > to > > > > a > > > > >>>>> source reader again. > > > > >>>>> > > > > >>>>> There are other cases where there may be conflict information > > between > > > > >>>>> reader and enumerator. For example, consider the following > > sequence: > > > > >>>>> 1. reader A reports splits (1 and 2) up on restart. > > > > >>>>> 2. enumerator receives the report and assigns both 1 and 2 to > > reader > > > > B. > > > > >>>>> 3. reader A failed before checkpointing. And this is a partial > > > > >>>>> failure, so only reader A restarts. > > > > >>>>> 4. When reader A recovers, it will again report splits (1 and > 2) > > to > > > > >>>>> the enumerator. > > > > >>>>> 5. The enumerator should ignore this report because it has > > > > >>>>> assigned splits (1 and 2) to reader B. > > > > >>>>> > > > > >>>>> So with the new contract, the enumerator should be the source > of > > > > truth > > > > >>>>> for split ownership. > > > > >>>>> > > > > >>>>> Thanks, > > > > >>>>> > > > > >>>>> Jiangjie (Becket) Qin > > > > >>>>> > > > > >>>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang < > > > > [email protected]> > > > > >>>>> wrote: > > > > >>>>> > > > > >>>>>> Hi Becket, > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> I did consider this approach at the beginning (and it was also > > > > >>>>>> mentioned in this FLIP), since it would allow more flexibility > > in > > > > >>>>>> reassigning all splits. However, there are a few potential > > issues. > > > > >>>>>> > > > > >>>>>> 1. High Transmission Cost > > > > >>>>>> If we pass the full split objects (rather than just split > IDs), > > the > > > > >>>>>> data size could be significant, leading to high overhead > during > > > > >>>>>> transmission — especially when many splits are involved. > > > > >>>>>> > > > > >>>>>> 2. Risk of Split Loss > > > > >>>>>> > > > > >>>>>> Risk of split loss exists unless we have a mechanism to make > > sure > > > > >>>>>> only can checkpoint after all the splits are reassigned. > > > > >>>>>> There are scenarios where splits could be lost due to > > inconsistent > > > > >>>>>> state handling during recovery: > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> Scenario 1: > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> 1. Upon restart, Reader A reports assigned splits (1 and > 2), > > and > > > > >>>>>> Reader B reports (3 and 4). > > > > >>>>>> 2. The enumerator receives these reports but only reassigns > > > > >>>>>> splits 1 and 2 — not 3 and 4. > > > > >>>>>> 3. A checkpoint or savepoint is then triggered. Only > splits 1 > > and > > > > >>>>>> 2 are recorded in the reader states; splits 3 and 4 are not > > > > persisted. > > > > >>>>>> 4. If the job is later restarted from this checkpoint, > splits > > 3 > > > > >>>>>> and 4 will be permanently lost. > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> Scenario 2: > > > > >>>>>> > > > > >>>>>> 1. Reader A reports splits (1 and 2), and Reader B reports > (3 > > and > > > > >>>>>> 4) upon restart. > > > > >>>>>> 2. Before the enumerator receives all reports and performs > > > > >>>>>> reassignment, a checkpoint is triggered. > > > > >>>>>> 3. Since no splits have been reassigned yet, both readers > > have > > > > >>>>>> empty states. > > > > >>>>>> 4. When restarting from this checkpoint, all four splits > are > > > > lost. > > > > >>>>>> > > > > >>>>>> > > > > >>>>>> Let me know if you have thoughts on how we might mitigate > these > > > > risks! > > > > >>>>>> > > > > >>>>>> Best > > > > >>>>>> Hongshun > > > > >>>>>> > > > > >>>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <[email protected]> > > > > >>>>>> wrote: > > > > >>>>>> > > > > >>>>>>> Hi Hongshun, > > > > >>>>>>> > > > > >>>>>>> The steps sound reasonable to me in general. In terms of the > > > > updated > > > > >>>>>>> FLIP wiki, it would be good to see if we can keep the > protocol > > > > simple. One > > > > >>>>>>> alternative way to achieve this behavior is following: > > > > >>>>>>> > > > > >>>>>>> 1. Upon SourceOperator startup, the SourceOperator sends > > > > >>>>>>> ReaderRegistrationEvent with the currently assigned splits to > > the > > > > >>>>>>> enumerator. It does not add these splits to the SourceReader. > > > > >>>>>>> 2. The enumerator will always use the > > > > >>>>>>> SourceEnumeratorContext.assignSplits() to assign the splits. > > (not > > > > via the > > > > >>>>>>> response of the SourceRegistrationEvent, this allows async > > split > > > > assignment > > > > >>>>>>> in case the enumerator wants to wait until all the readers > are > > > > registered) > > > > >>>>>>> 3. The SourceOperator will only call SourceReader.addSplits() > > when > > > > >>>>>>> it receives the AddSplitEvent from the enumerator. > > > > >>>>>>> > > > > >>>>>>> This protocol has a few benefits: > > > > >>>>>>> 1. it basically allows arbitrary split reassignment upon > > restart > > > > >>>>>>> 2. simplicity: there is only one way to assign splits. > > > > >>>>>>> > > > > >>>>>>> So we only need one interface change: > > > > >>>>>>> - add the initially assigned splits to ReaderInfo so the > > Enumerator > > > > >>>>>>> can access it. > > > > >>>>>>> and one behavior change: > > > > >>>>>>> - The SourceOperator should stop assigning splits to the from > > state > > > > >>>>>>> restoration, but > > [message truncated...] > > >
