Hi devs, If there are no further suggestions, I will start the voting tomorrow。
Best, Hongshun On Fri, Sep 26, 2025 at 7:48 PM Hongshun Wang <[email protected]> wrote: > Hi Becket and Leonard, > > I have updated the content of this FLIP. The key point is that: > > When the split enumerator receives a split, *these splits must have > already existed in pendingSplitAssignment or assignedSplitments*. > > - If the split is in pendingSplitAssignments, ignore it. > - If the split is in assignedSplitAssignments but has a different > taskId, ignore it (this indicates it was already assigned to another > task). > - If the split is in assignedSplitAssignments and shares the same > taskId, move the assignment from assignedSplitments to > pendingSplitAssignment > to re-assign again. > > > For better understanding why use these strategies. I added some examples > and pictures to show it. > > Would you like to help me check whether there are still some problems? > > Best > Hongshun > > > > On Fri, Sep 26, 2025 at 5:08 PM Leonard Xu <[email protected]> wrote: > >> Thanks Becket and Hongshun for the insightful discussion. >> >> The underlying implementation and communication mechanisms of Flink >> Source indeed involve many intricate details, we discussed the issue of >> splits re-assignment in specific scenarios, but fortunately, the final >> decision turned out to be pretty clear. >> >> +1 to Becket’s proposal to keeps the framework cleaner and more >> flexible. >> +1 to Hongshun’s point to provide comprehensive guidance for connector >> developers. >> >> >> Best, >> Leonard >> >> >> >> 2025 9月 26 16:30,Hongshun Wang <[email protected]> 写道: >> >> Hi Becket, >> >> I Got it. You’re suggesting we should not handle this in the source >> framework but instead let the split enumerator manage these three scenarios. >> >> Let me explain why I originally favored handling it in the framework: I'm >> concerned that connector developers might overlook certain edge cases >> (after all, we even payed extensive discussions to fully clarify the logic) >> >> However, your point keeps the framework cleaner and more flexible. Thus, >> I will take it. >> >> Perhaps, in this FLIP, we should focus on providing comprehensive >> guidance for connector developers: explain how to implement a split >> enumerator, including the underlying challenges and their solutions. >> >> >> Additionally, we can use the Kafka connector as a reference >> implementation to demonstrate the practical steps. This way, developers who >> want to implement similar connectors can directly reference this example. >> >> >> Best, >> Hongshun >> >> >> >> On Fri, Sep 26, 2025 at 1:27 PM Becket Qin <[email protected]> wrote: >> >>> It would be good to not expose runtime details to the source >>> implementation if possible. >>> >>> Today, the split enumerator implementations are expected to track the >>> split assignment. >>> >>> Assuming the split enumerator implementation keeps a split assignment >>> map, that means the enumerator should already know whether a split is >>> assigned or unassigned. So it can handle the three scenarios you mentioned. >>> >>> The split is reported by a reader during a global restoration. >>>> >>> The split enumerator should have just been restored / created. If the >>> enumerator expects a full reassignment of splits up on global recovery, >>> there should be no assigned splits to that reader in the split assignment >>> mapping. >>> >>> The split is reported by a reader during a partial failure recovery. >>>> >>> In this case, when SplitEnumerator.addReader() is invoked, the split >>> assignment map in the enumerator implementation should already have some >>> split assignments for the reader. Therefore it is a partial failover. If >>> the source supports split reassignment on recovery, the enumerator can >>> assign splits that are different from the reported assignment of that >>> reader in the SplitEnumeratorContext, or it can also assign the same >>> splits. In any case, the enumerator knows that this is a partial recovery >>> because the assignment map is non-empty. >>> >>> The split is not reported by a reader, but is assigned after the last >>>> successful checkpoint and was never acknowledged. >>> >>> This is actually one of the step in the partial failure recover. >>> SplitEnumerator.addSplitsBack() will be called first before >>> SplitReader.addReader() is called for the recovered reader. When the >>> SplitEnumerator.addSplitsBack() is invoked, it is for sure a partial >>> recovery. And the enumerator should remove these splits from the split >>> assignment map as if they were never assigned. >>> >>> I think this should work, right? >>> >>> Thanks, >>> >>> Jiangjie (Becket) Qin >>> >>> On Thu, Sep 25, 2025 at 8:34 PM Hongshun Wang <[email protected]> >>> wrote: >>> >>>> Hi Becket and Leonard, >>>> >>>> Thanks for your advice. >>>> >>>> > put all the reader information in the SplitEnumerator context >>>> I have a concern: the current registeredReaders in* >>>> SourceCoordinatorContext will be removed after subtaskResetor execution on >>>> failure*.However, this approach has merit. >>>> >>>> One more situation I found my previous design does not cover: >>>> >>>> 1. Initial state: Reader A reports splits (1, 2). >>>> 2. Enumerator action: Assigns split 1 to Reader A, and split 2 to >>>> Reader B. >>>> 3. Failure scenario: Reader A fails before checkpointing. Since >>>> this is a partial failure, only Reader A restarts. >>>> 4. Recovery issue: Upon recovery, Reader A re-reports split (1). >>>> >>>> In my previous design, the enumerator will ignore Reader A's >>>> re-registration which will cause data loss. >>>> >>>> Thus, when the enumerator receives a split, the split may originate >>>> from three scenarios: >>>> >>>> 1. The split is reported by a reader during a global restoration. >>>> 2. The split is reported by a reader during a partial failure >>>> recovery. >>>> 3. The split is not reported by a reader, but is assigned after the >>>> last successful checkpoint and was never acknowledged. >>>> >>>> In the first scenario (global restore), the split should >>>> be re-distributed. For the latter two scenarios (partial failover and >>>> post-checkpoint assignment), we need to reassign the split to >>>> its originally assigned subtask. >>>> >>>> By implementing a method in the SplitEnumerator context to track each >>>> assigned split's status, the system can correctly identify and resolve >>>> split ownership in all three scenarios.*What about adding a >>>> `SplitRecoveryType splitRecoveryType(Split split)` in >>>> SplitEnumeratorContext.* SplitRecoveryTypeis a enum including >>>> `UNASSIGNED`、`GLOBAL_RESTORE`、`PARTIAL_FAILOVER` and >>>> `POST_CHECKPOINT_ASSIGNMENT`. >>>> >>>> What do you think? Are there any details or scenarios I haven't >>>> considered? Looking forward to your advice. >>>> >>>> Best, >>>> Hongshun >>>> >>>> On Thu, Sep 11, 2025 at 12:41 AM Becket Qin <[email protected]> >>>> wrote: >>>> >>>>> Thanks for the explanation, Hongshun. >>>>> >>>>> Current pattern of handling new reader registration following: >>>>> 1. put all the reader information in the SplitEnumerator context >>>>> 2. notify the enumerator about the new reader registration. >>>>> 3. Let the split enumerator get whatever information it wants from the >>>>> context and do its job. >>>>> This pattern decouples the information passing and the reader >>>>> registration >>>>> notification. This makes the API extensible - we can add more >>>>> information >>>>> (e.g. reported assigned splits in our case) about the reader to the >>>>> context >>>>> without introducing new methods. >>>>> >>>>> Introducing a new method of addSplitBackOnRecovery() is redundant to >>>>> the >>>>> above pattern. Do we really need it? >>>>> >>>>> Thanks, >>>>> >>>>> Jiangjie (Becket) Qin >>>>> >>>>> On Mon, Sep 8, 2025 at 8:18 PM Hongshun Wang <[email protected]> >>>>> wrote: >>>>> >>>>> > Hi Becket, >>>>> > >>>>> > > I am curious what would the enumerator do differently for the >>>>> splits >>>>> > added via addSplitsBackOnRecovery() V.S. addSplitsBack()? >>>>> > >>>>> > In this FLIP, there are two distinct scenarios in which the >>>>> enumerator >>>>> > receives splits being added back: >>>>> > 1. Job-level restore: The job is restored, splits from reader’s >>>>> state are >>>>> > reported by ReaderRegistrationEvent. >>>>> > 2. Reader-level restart: a reader is started but not the whole job, >>>>> > splits assigned to it after the last successful checkpoint. This is >>>>> what >>>>> > addSplitsBack used to do. >>>>> > >>>>> > >>>>> > In these two situations, the enumerator will choose different >>>>> strategies. >>>>> > 1. Job-level restore: the splits should be redistributed across >>>>> readers >>>>> > according to the current partitioner strategy. >>>>> > 2. Reader-level restart: the splits should be reassigned directly >>>>> back to >>>>> > the same reader they were originally assigned to, preserving >>>>> locality and >>>>> > avoiding unnecessary redistribution >>>>> > >>>>> > Therefore, the enumerator must clearly distinguish between these two >>>>> > scenarios.I used to deprecate the former addSplitsBack(List<SplitT> >>>>> > splits, int subtaskId) but add a new addSplitsBack(List<SplitT> >>>>> > splits, int subtaskId, >>>>> > boolean reportedByReader). >>>>> > Leonard suggest to use another method addSplitsBackOnRecovery but not >>>>> > influenced currently addSplitsBack. >>>>> > >>>>> > Best >>>>> > Hongshun >>>>> > >>>>> > >>>>> > >>>>> > On 2025/09/08 17:20:31 Becket Qin wrote: >>>>> > > Hi Leonard, >>>>> > > >>>>> > > >>>>> > > > Could we introduce a new method like addSplitsBackOnRecovery >>>>> with >>>>> > default >>>>> > > > implementation. In this way, we can provide better backward >>>>> > compatibility >>>>> > > > and also makes it easier for developers to understand. >>>>> > > >>>>> > > >>>>> > > I am curious what would the enumerator do differently for the >>>>> splits >>>>> > added >>>>> > > via addSplitsBackOnRecovery() V.S. addSplitsBack()? Today, >>>>> > addSplitsBack() >>>>> > > is also only called upon recovery. So the new method seems >>>>> confusing. One >>>>> > > thing worth clarifying is if the Source implements >>>>> > > SupportSplitReassignmentOnRecovery, upon recovery, should the >>>>> splits >>>>> > > reported by the readers also be added back to the SplitEnumerator >>>>> via the >>>>> > > addSplitsBack() call? Or should the SplitEnumerator explicitly >>>>> query the >>>>> > > registered reader information via the SplitEnumeratorContext to >>>>> get the >>>>> > > originally assigned splits when addReader() is invoked? I was >>>>> assuming >>>>> > the >>>>> > > latter in the beginning, so the behavior of addSplitsBack() remains >>>>> > > unchanged, but I am not opposed in doing the former. >>>>> > > >>>>> > > Also, can you elaborate on the backwards compatibility issue you >>>>> see if >>>>> > we >>>>> > > do not have a separate addSplitsBackOnRecovery() method? Even >>>>> without >>>>> > this >>>>> > > new method, the behavior remains exactly the same unless the end >>>>> users >>>>> > > implement the mix-in interface of >>>>> "SupportSplitReassignmentOnRecovery", >>>>> > > right? >>>>> > > >>>>> > > Thanks, >>>>> > > >>>>> > > Jiangjie (Becket) Qin >>>>> > > >>>>> > > On Mon, Sep 8, 2025 at 1:48 AM Hongshun Wang <[email protected]> >>>>> > > wrote: >>>>> > > >>>>> > > > Hi devs, >>>>> > > > >>>>> > > > It has been quite some time since this FLIP[1] was first >>>>> proposed. >>>>> > Thank >>>>> > > > you for your valuable feedback—based on your suggestions, the >>>>> FLIP has >>>>> > > > undergone several rounds of revisions. >>>>> > > > >>>>> > > > Any more advice is welcome and appreciated. If there are no >>>>> further >>>>> > > > concerns, I plan to start the vote tomorrow. >>>>> > > > >>>>> > > > Best >>>>> > > > Hongshun >>>>> > > > >>>>> > > > [1] >>>>> > > > >>>>> > >>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480 >>>>> > > > >>>>> > > > On Mon, Sep 8, 2025 at 4:42 PM Hongshun Wang <[email protected]> >>>>> > > > wrote: >>>>> > > > >>>>> > > > > Hi Leonard, >>>>> > > > > Thanks for your advice. It makes sense and I have modified it. >>>>> > > > > >>>>> > > > > Best, >>>>> > > > > Hongshun >>>>> > > > > >>>>> > > > > On Mon, Sep 8, 2025 at 11:40 AM Leonard Xu <[email protected]> >>>>> wrote: >>>>> > > > > >>>>> > > > >> Thanks Hongshun and Becket for the deep discussion. >>>>> > > > >> >>>>> > > > >> I only have one comment for one API design: >>>>> > > > >> >>>>> > > > >> > Deprecate the old addSplitsBack method, use a >>>>> addSplitsBack with >>>>> > > > >> param isReportedByReader instead. Because, The enumerator can >>>>> apply >>>>> > > > >> different reassignment policies based on the context. >>>>> > > > >> >>>>> > > > >> Could we introduce a new method like >>>>> *addSplitsBackOnRecovery* with >>>>> > > > default >>>>> > > > >> implementation. In this way, we can provide better backward >>>>> > > > >> compatibility and also makes it easier for developers to >>>>> understand. >>>>> > > > >> >>>>> > > > >> Best, >>>>> > > > >> Leonard >>>>> > > > >> >>>>> > > > >> >>>>> > > > >> >>>>> > > > >> 2025 9月 3 20:26,Hongshun Wang <[email protected]> 写道: >>>>> > > > >> >>>>> > > > >> Hi Becket, >>>>> > > > >> >>>>> > > > >> I think that's a great idea! I have added the >>>>> > > > >> SupportSplitReassignmentOnRecovery interface in this FLIP. If >>>>> a >>>>> > Source >>>>> > > > >> implements this interface indicates that the source operator >>>>> needs >>>>> > to >>>>> > > > >> report splits to the enumerator and receive reassignment.[1] >>>>> > > > >> >>>>> > > > >> Best, >>>>> > > > >> Hongshun >>>>> > > > >> >>>>> > > > >> [1] >>>>> > > > >> >>>>> > > > >>>>> > >>>>> > >>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>>> > > > >> >>>>> > > > >> On Thu, Aug 21, 2025 at 12:09 PM Becket Qin <[email protected]> >>>>> > > > wrote: >>>>> > > > >> >>>>> > > > >>> Hi Hongshun, >>>>> > > > >>> >>>>> > > > >>> I think the convention for such optional features in Source >>>>> is via >>>>> > > > >>> mix-in interfaces. So instead of adding a method to the >>>>> > SourceReader, >>>>> > > > maybe >>>>> > > > >>> we should introduce an interface >>>>> SupportSplitReassingmentOnRecovery >>>>> > > > with >>>>> > > > >>> this method. If a Source implementation implements that >>>>> interface, >>>>> > > > then the >>>>> > > > >>> SourceOperator will check the desired behavior and act >>>>> accordingly. >>>>> > > > >>> >>>>> > > > >>> Thanks, >>>>> > > > >>> >>>>> > > > >>> Jiangjie (Becket) Qin >>>>> > > > >>> >>>>> > > > >>> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang < >>>>> > [email protected] >>>>> > > > > >>>>> > > > >>> wrote: >>>>> > > > >>> >>>>> > > > >>>> Hi de vs, >>>>> > > > >>>> >>>>> > > > >>>> Would anyone like to discuss this FLIP? I'd appreciate your >>>>> > feedback >>>>> > > > >>>> and suggestions. >>>>> > > > >>>> >>>>> > > > >>>> Best, >>>>> > > > >>>> Hongshun >>>>> > > > >>>> >>>>> > > > >>>> >>>>> > > > >>>> 2025年8月13日 14:23,Hongshun Wang <[email protected]> 写道: >>>>> > > > >>>> >>>>> > > > >>>> Hi Becket, >>>>> > > > >>>> >>>>> > > > >>>> Thank you for your detailed feedback. The new contract >>>>> makes good >>>>> > > > sense >>>>> > > > >>>> to me and effectively addresses the issues I encountered at >>>>> the >>>>> > > > beginning >>>>> > > > >>>> of the design. >>>>> > > > >>>> >>>>> > > > >>>> That said, I recommend not reporting splits by default, >>>>> primarily >>>>> > for >>>>> > > > >>>> compatibility and practical reasons: >>>>> > > > >>>> >>>>> > > > >>>> > For these reasons, we do not expect the Split objects to >>>>> be >>>>> > huge, >>>>> > > > >>>> and we are not trying to design for huge Split objects >>>>> either as >>>>> > they >>>>> > > > will >>>>> > > > >>>> have problems even today. >>>>> > > > >>>> >>>>> > > > >>>> 1. >>>>> > > > >>>> >>>>> > > > >>>> Not all existing connector match this rule >>>>> > > > >>>> For example, in mysql cdc connector, a binlog split may >>>>> contain >>>>> > > > >>>> hundreds (or even more) snapshot split completion >>>>> records. This >>>>> > > > state is >>>>> > > > >>>> large and is currently transmitted incrementally through >>>>> > multiple >>>>> > > > >>>> BinlogSplitMetaEvent messages. Since the binlog reader >>>>> operates >>>>> > > > >>>> with single parallelism, reporting the full split state >>>>> on >>>>> > recovery >>>>> > > > >>>> could be inefficient or even infeasible. >>>>> > > > >>>> For such sources, it would be better to provide a >>>>> mechanism to >>>>> > skip >>>>> > > > >>>> split reporting during restart until they redesign and >>>>> reduce >>>>> > the >>>>> > > > >>>> split size. >>>>> > > > >>>> 2. >>>>> > > > >>>> >>>>> > > > >>>> Not all enumerators maintain unassigned splits in state. >>>>> > > > >>>> Some SplitEnumerator(such as kafka connector) >>>>> implementations >>>>> > do >>>>> > > > >>>> not track or persistently manage unassigned splits. >>>>> Requiring >>>>> > them >>>>> > > > to >>>>> > > > >>>> handle re-registration would add unnecessary complexity. >>>>> Even >>>>> > > > though we >>>>> > > > >>>> maybe implements in kafka connector, currently, kafka >>>>> connector >>>>> > is >>>>> > > > decouple >>>>> > > > >>>> with flink version, we also need to make sure the elder >>>>> version >>>>> > is >>>>> > > > >>>> compatible. >>>>> > > > >>>> >>>>> > > > >>>> ------------------------------ >>>>> > > > >>>> >>>>> > > > >>>> To address these concerns, I propose introducing a new >>>>> method: >>>>> > boolean >>>>> > > > >>>> SourceReader#shouldReassignSplitsOnRecovery() with a default >>>>> > > > >>>> implementation returning false. This allows source readers >>>>> to opt >>>>> > in >>>>> > > > >>>> to split reassignment only when necessary. Since the new >>>>> contract >>>>> > > > already >>>>> > > > >>>> places the responsibility for split assignment on the >>>>> enumerator, >>>>> > not >>>>> > > > >>>> reporting splits by default is a safe and clean default >>>>> behavior. >>>>> > > > >>>> >>>>> > > > >>>> >>>>> > > > >>>> ------------------------------ >>>>> > > > >>>> >>>>> > > > >>>> >>>>> > > > >>>> I’ve updated the implementation and the FIP accordingly[1]. >>>>> It >>>>> > quite a >>>>> > > > >>>> big change. In particular, for the Kafka connector, we can >>>>> now use >>>>> > a >>>>> > > > >>>> pluggable SplitPartitioner to support different split >>>>> assignment >>>>> > > > >>>> strategies (e.g., default, round-robin). >>>>> > > > >>>> >>>>> > > > >>>> >>>>> > > > >>>> Could you please review it when you have a chance? >>>>> > > > >>>> >>>>> > > > >>>> >>>>> > > > >>>> Best, >>>>> > > > >>>> >>>>> > > > >>>> Hongshun >>>>> > > > >>>> >>>>> > > > >>>> >>>>> > > > >>>> [1] >>>>> > > > >>>> >>>>> > > > >>>>> > >>>>> > >>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>>> > > > >>>> >>>>> > > > >>>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <[email protected]> >>>>> > > > wrote: >>>>> > > > >>>> >>>>> > > > >>>>> Hi Hongshun, >>>>> > > > >>>>> >>>>> > > > >>>>> I am not too concerned about the transmission cost. >>>>> Because the >>>>> > full >>>>> > > > >>>>> split transmission has to happen in the initial assignment >>>>> phase >>>>> > > > already. >>>>> > > > >>>>> And in the future, we probably want to also introduce some >>>>> kind >>>>> > of >>>>> > > > workload >>>>> > > > >>>>> balance across source readers, e.g. based on the per-split >>>>> > > > throughput or >>>>> > > > >>>>> the per-source-reader workload in heterogeneous clusters. >>>>> For >>>>> > these >>>>> > > > >>>>> reasons, we do not expect the Split objects to be huge, >>>>> and we >>>>> > are >>>>> > > > not >>>>> > > > >>>>> trying to design for huge Split objects either as they >>>>> will have >>>>> > > > problems >>>>> > > > >>>>> even today. >>>>> > > > >>>>> >>>>> > > > >>>>> Good point on the potential split loss, please see the >>>>> reply >>>>> > below: >>>>> > > > >>>>> >>>>> > > > >>>>> Scenario 2: >>>>> > > > >>>>> >>>>> > > > >>>>> >>>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and Reader B >>>>> reports (3 >>>>> > and 4) >>>>> > > > >>>>>> upon restart. >>>>> > > > >>>>>> 2. Before the enumerator receives all reports and performs >>>>> > > > >>>>>> reassignment, a checkpoint is triggered. >>>>> > > > >>>>>> 3. Since no splits have been reassigned yet, both readers >>>>> have >>>>> > empty >>>>> > > > >>>>>> states. >>>>> > > > >>>>>> 4. When restarting from this checkpoint, all four splits >>>>> are >>>>> > lost. >>>>> > > > >>>>> >>>>> > > > >>>>> The reader registration happens in the >>>>> SourceOperator.open(), >>>>> > which >>>>> > > > >>>>> means the task is still in the initializing state, >>>>> therefore the >>>>> > > > checkpoint >>>>> > > > >>>>> should not be triggered until the enumerator receives all >>>>> the >>>>> > split >>>>> > > > reports. >>>>> > > > >>>>> >>>>> > > > >>>>> There is a nuance here. Today, the RPC call from the TM to >>>>> the JM >>>>> > is >>>>> > > > >>>>> async. So it is possible that the SourceOpertor.open() has >>>>> > returned, >>>>> > > > but >>>>> > > > >>>>> the enumerator has not received the split reports. However, >>>>> > because >>>>> > > > the >>>>> > > > >>>>> task status update RPC call goes to the same channel as >>>>> the split >>>>> > > > reports >>>>> > > > >>>>> call, so the task status RPC call will happen after the >>>>> split >>>>> > > > reports call >>>>> > > > >>>>> on the JM side. Therefore, on the JM side, the >>>>> SourceCoordinator >>>>> > will >>>>> > > > >>>>> always first receive the split reports, then receive the >>>>> > checkpoint >>>>> > > > request. >>>>> > > > >>>>> This "happen before" relationship is kind of important to >>>>> > guarantee >>>>> > > > >>>>> the consistent state between enumerator and readers. >>>>> > > > >>>>> >>>>> > > > >>>>> Scenario 1: >>>>> > > > >>>>> >>>>> > > > >>>>> >>>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned splits (1 and >>>>> 2), and >>>>> > > > >>>>>> Reader B reports (3 and 4). >>>>> > > > >>>>>> 2. The enumerator receives these reports but only >>>>> reassigns >>>>> > splits 1 >>>>> > > > >>>>>> and 2 — not 3 and 4. >>>>> > > > >>>>>> 3. A checkpoint or savepoint is then triggered. Only >>>>> splits 1 >>>>> > and 2 >>>>> > > > >>>>>> are recorded in the reader states; splits 3 and 4 are not >>>>> > persisted. >>>>> > > > >>>>>> 4. If the job is later restarted from this checkpoint, >>>>> splits 3 >>>>> > and >>>>> > > > 4 >>>>> > > > >>>>>> will be permanently lost. >>>>> > > > >>>>> >>>>> > > > >>>>> This scenario is possible. One solution is to let the >>>>> enumerator >>>>> > > > >>>>> implementation handle this. That means if the enumerator >>>>> relies >>>>> > on >>>>> > > > the >>>>> > > > >>>>> initial split reports from the source readers, it should >>>>> maintain >>>>> > > > these >>>>> > > > >>>>> reports by itself. In the above example, the enumerator >>>>> will need >>>>> > to >>>>> > > > >>>>> remember that 3 and 4 are not assigned and put it into its >>>>> own >>>>> > state. >>>>> > > > >>>>> The current contract is that anything assigned to the >>>>> > SourceReaders >>>>> > > > >>>>> are completely owned by the SourceReaders. Enumerators can >>>>> > remember >>>>> > > > the >>>>> > > > >>>>> assignments but cannot change them, even when the source >>>>> reader >>>>> > > > recovers / >>>>> > > > >>>>> restarts. >>>>> > > > >>>>> With this FLIP, the contract becomes that the source >>>>> readers will >>>>> > > > >>>>> return the ownership of the splits to the enumerator. So >>>>> the >>>>> > > > enumerator is >>>>> > > > >>>>> responsible for maintaining these splits, until they are >>>>> assigned >>>>> > to >>>>> > > > a >>>>> > > > >>>>> source reader again. >>>>> > > > >>>>> >>>>> > > > >>>>> There are other cases where there may be conflict >>>>> information >>>>> > between >>>>> > > > >>>>> reader and enumerator. For example, consider the following >>>>> > sequence: >>>>> > > > >>>>> 1. reader A reports splits (1 and 2) up on restart. >>>>> > > > >>>>> 2. enumerator receives the report and assigns both 1 and 2 >>>>> to >>>>> > reader >>>>> > > > B. >>>>> > > > >>>>> 3. reader A failed before checkpointing. And this is a >>>>> partial >>>>> > > > >>>>> failure, so only reader A restarts. >>>>> > > > >>>>> 4. When reader A recovers, it will again report splits (1 >>>>> and 2) >>>>> > to >>>>> > > > >>>>> the enumerator. >>>>> > > > >>>>> 5. The enumerator should ignore this report because it has >>>>> > > > >>>>> assigned splits (1 and 2) to reader B. >>>>> > > > >>>>> >>>>> > > > >>>>> So with the new contract, the enumerator should be the >>>>> source of >>>>> > > > truth >>>>> > > > >>>>> for split ownership. >>>>> > > > >>>>> >>>>> > > > >>>>> Thanks, >>>>> > > > >>>>> >>>>> > > > >>>>> Jiangjie (Becket) Qin >>>>> > > > >>>>> >>>>> > > > >>>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang < >>>>> > > > [email protected]> >>>>> > > > >>>>> wrote: >>>>> > > > >>>>> >>>>> > > > >>>>>> Hi Becket, >>>>> > > > >>>>>> >>>>> > > > >>>>>> >>>>> > > > >>>>>> I did consider this approach at the beginning (and it was >>>>> also >>>>> > > > >>>>>> mentioned in this FLIP), since it would allow more >>>>> flexibility >>>>> > in >>>>> > > > >>>>>> reassigning all splits. However, there are a few potential >>>>> > issues. >>>>> > > > >>>>>> >>>>> > > > >>>>>> 1. High Transmission Cost >>>>> > > > >>>>>> If we pass the full split objects (rather than just split >>>>> IDs), >>>>> > the >>>>> > > > >>>>>> data size could be significant, leading to high overhead >>>>> during >>>>> > > > >>>>>> transmission — especially when many splits are involved. >>>>> > > > >>>>>> >>>>> > > > >>>>>> 2. Risk of Split Loss >>>>> > > > >>>>>> >>>>> > > > >>>>>> Risk of split loss exists unless we have a mechanism to >>>>> make >>>>> > sure >>>>> > > > >>>>>> only can checkpoint after all the splits are reassigned. >>>>> > > > >>>>>> There are scenarios where splits could be lost due to >>>>> > inconsistent >>>>> > > > >>>>>> state handling during recovery: >>>>> > > > >>>>>> >>>>> > > > >>>>>> >>>>> > > > >>>>>> Scenario 1: >>>>> > > > >>>>>> >>>>> > > > >>>>>> >>>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned splits (1 >>>>> and 2), >>>>> > and >>>>> > > > >>>>>> Reader B reports (3 and 4). >>>>> > > > >>>>>> 2. The enumerator receives these reports but only >>>>> reassigns >>>>> > > > >>>>>> splits 1 and 2 — not 3 and 4. >>>>> > > > >>>>>> 3. A checkpoint or savepoint is then triggered. Only >>>>> splits 1 >>>>> > and >>>>> > > > >>>>>> 2 are recorded in the reader states; splits 3 and 4 >>>>> are not >>>>> > > > persisted. >>>>> > > > >>>>>> 4. If the job is later restarted from this checkpoint, >>>>> splits >>>>> > 3 >>>>> > > > >>>>>> and 4 will be permanently lost. >>>>> > > > >>>>>> >>>>> > > > >>>>>> >>>>> > > > >>>>>> Scenario 2: >>>>> > > > >>>>>> >>>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and Reader B >>>>> reports (3 >>>>> > and >>>>> > > > >>>>>> 4) upon restart. >>>>> > > > >>>>>> 2. Before the enumerator receives all reports and >>>>> performs >>>>> > > > >>>>>> reassignment, a checkpoint is triggered. >>>>> > > > >>>>>> 3. Since no splits have been reassigned yet, both >>>>> readers >>>>> > have >>>>> > > > >>>>>> empty states. >>>>> > > > >>>>>> 4. When restarting from this checkpoint, all four >>>>> splits are >>>>> > > > lost. >>>>> > > > >>>>>> >>>>> > > > >>>>>> >>>>> > > > >>>>>> Let me know if you have thoughts on how we might mitigate >>>>> these >>>>> > > > risks! >>>>> > > > >>>>>> >>>>> > > > >>>>>> Best >>>>> > > > >>>>>> Hongshun >>>>> > > > >>>>>> >>>>> > > > >>>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin < >>>>> [email protected]> >>>>> > > > >>>>>> wrote: >>>>> > > > >>>>>> >>>>> > > > >>>>>>> Hi Hongshun, >>>>> > > > >>>>>>> >>>>> > > > >>>>>>> The steps sound reasonable to me in general. In terms of >>>>> the >>>>> > > > updated >>>>> > > > >>>>>>> FLIP wiki, it would be good to see if we can keep the >>>>> protocol >>>>> > > > simple. One >>>>> > > > >>>>>>> alternative way to achieve this behavior is following: >>>>> > > > >>>>>>> >>>>> > > > >>>>>>> 1. Upon SourceOperator startup, the SourceOperator sends >>>>> > > > >>>>>>> ReaderRegistrationEvent with the currently assigned >>>>> splits to >>>>> > the >>>>> > > > >>>>>>> enumerator. It does not add these splits to the >>>>> SourceReader. >>>>> > > > >>>>>>> 2. The enumerator will always use the >>>>> > > > >>>>>>> SourceEnumeratorContext.assignSplits() to assign the >>>>> splits. >>>>> > (not >>>>> > > > via the >>>>> > > > >>>>>>> response of the SourceRegistrationEvent, this allows >>>>> async >>>>> > split >>>>> > > > assignment >>>>> > > > >>>>>>> in case the enumerator wants to wait until all the >>>>> readers are >>>>> > > > registered) >>>>> > > > >>>>>>> 3. The SourceOperator will only call >>>>> SourceReader.addSplits() >>>>> > when >>>>> > > > >>>>>>> it receives the AddSplitEvent from the enumerator. >>>>> > > > >>>>>>> >>>>> > > > >>>>>>> This protocol has a few benefits: >>>>> > > > >>>>>>> 1. it basically allows arbitrary split reassignment upon >>>>> > restart >>>>> > > > >>>>>>> 2. simplicity: there is only one way to assign splits. >>>>> > > > >>>>>>> >>>>> > > > >>>>>>> So we only need one interface change: >>>>> > > > >>>>>>> - add the initially assigned splits to ReaderInfo so the >>>>> > Enumerator >>>>> > > > >>>>>>> can access it. >>>>> > > > >>>>>>> and one behavior change: >>>>> > > > >>>>>>> - The SourceOperator should stop assigning splits to the >>>>> from >>>>> > state >>>>> > > > >>>>>>> restoration, but >>>>> > [message truncated...] >>>>> > >>>>> >>>> >>
