Hi Becket, Thanks for your explanation. > For the same three input above, the assignment should be consistently the same.
That is exactly what troubles me. For *assignment algorithms such as hash, it does behave the same. What If we use round-robin? Each *the *reader information, the same split will be assigned to different readers. There is also what I used to list as an example.* 1. *Initial state:*: 2 parallelism, 2 splits. 2. *Enumerator action:* Split 1 → Task 1, Split 2 → Task 2 , , 3. *Failure scenario: *After Split 2 is assigned to Task 2 but before next checkpoint success, task 1 restarts. 4. *Recovery issue:* Split 2 is re-added to the enumerator. Round-robin strategy assigns Split 2 to Task 1. Then Task 1 now has 2 splits, Task 2 has 0 → Imbalanced distribution. > Please let me know if you think a meeting would be more efficient. Yes, I’d like to reach an agreement as soon as possible. If you’re available, we could schedule a meeting with Lenenord as well. Best, Hongshun On Sat, Oct 11, 2025 at 3:59 PM Becket Qin <[email protected]> wrote: > Hi Hongshun, > > I am confused. First of all, regardless of what the assignment algorithm > is. Using SplitEnumeratorContext to return the splits only gives more > information than using addSplitsBack(). So there should be no regression. > > Secondly, at this point. The SplitEnumerator should only take the > following three input to generate the global splits assignment: > 1. the *reader information (num readers, locations, etc)* > 2. *all the splits to assign* > 3. *configured assignment algorithm * > Preferably, for the same three input above, the assignment should be > consistently the same. I don't see why it should care about why a new > reader is added, whether due to partial failover or global failover or job > restart. > > If you want to do global redistribution on global failover and restart, > but honor the existing assignment for partial failover. The enumerator will > just do the following: > 1. Generate a new global assignment (global redistribution) in start() > because start() will only be invoked in global failover or restart. That > means all the readers are also new with empty assignment. > 2. After the global assignment is generated, it should be honored for the > whole life cycle. there might be many reader registrations, again for > different reasons but does not matter: > - reader registration after this job restart > - reader registration after this global failover > - reader registration due to partial failover which may or may not > have a addSplitsBack() call. > Regardless of the reason, the split enumerator will just enforce the > global assignment it has already generated, i.e. without split > redistribution. > > Wouldn't that give the behavior you want? I feel the discussion somehow > goes to circles. Please let me know if you think a meeting would be more > efficient. > > Thanks, > > Jiangjie (Becket) Qin > > On Fri, Oct 10, 2025 at 7:58 PM Hongshun Wang <[email protected]> > wrote: > >> Hi Becket, >> >> > Ignore a returned split if it has been assigned to a different reader, >> otherwise put it back to unassigned splits / pending splits. Then the >> enumerator assigns new splits to the newly added reader, which may use the >> previous assignment as a reference. This should work regardless of whether >> it is a global failover, partial failover, restart, etc. There is no need >> for the SplitEnumerator to distinguish what failover scenario it is. >> >> In this case, it seems that global failover and partial failover share >> the same distribution strategy If it has not been assigned to a different >> reader. However, global failover needs to be redistributed(this is why we >> need this FLIP) , while partial failover is not. I have no idea how we >> distinguish them. >> >> What do you think? >> >> Best, >> Hongshun >> >> On Sat, Oct 11, 2025 at 12:54 AM Becket Qin <[email protected]> wrote: >> >>> Hi Hongshun, >>> >>> The problem we are trying to solve here is to give the splits back to >>> the SplitEnumerator. There are only two types of splits to give back: >>> 1) splits whose assignment has been checkpointed. - In this case, we >>> rely on addReader() + SplitEnumeratorContext to give the splits back, this >>> provides more information associated with those splits. >>> 2) splits whose assignment has not been checkpointed. - In this case, >>> we use addSplitsBack(), there is no reader info to give because the >>> previous assignment did not take effect to begin with. >>> >>> From the SplitEnumerator implementation perspective, the contract is >>> straightforward. >>> 1. The SplitEnumerator is the source of truth for assignment. >>> 2. When the enumerator receives the addSplits() call, it always add >>> these splits back to unassigned splits / pending splits. >>> 3. When the enumerator receives the addReader() call, that means the >>> reader has no current assignment, and has returned its previous assignment >>> based on the reader side info. The SplitEnumerator checks the >>> SplitEnumeratorContext to retrieve the returned splits from that reader >>> (i.e. previous assignment) and handle them according to its own source of >>> truth knowledge of assignment - Ignore a returned split if it has been >>> assigned to a different reader, otherwise put it back to unassigned splits >>> / pending splits. Then the enumerator assigns new splits to the newly added >>> reader, which may use the previous assignment as a reference. This should >>> work regardless of whether it is a global failover, partial failover, >>> restart, etc. There is no need for the SplitEnumerator to distinguish what >>> failover scenario it is. >>> >>> Would this work? >>> >>> Thanks, >>> >>> Jiangjie (Becket) Qin >>> >>> On Fri, Oct 10, 2025 at 1:28 AM Hongshun Wang <[email protected]> >>> wrote: >>> >>>> Hi Becket, >>>> > why do we need to change the behavior of addSplitsBack()? Should it >>>> remain the same? >>>> >>>> How does the enumerator get the splits from ReaderRegistrationEvent and >>>> then reassign it? >>>> >>>> You have given a advice before: >>>> > 1. Put all the reader information in the SplitEnumerator context. 2. >>>> notify the enumerator about the new reader registration. 3. let the split >>>> enumerator get whatever information it wants from the context and do its >>>> job. >>>> >>>> However, each time a source task fails over, the ConcurrentMap<Integer, >>>> ConcurrentMap<Integer, ReaderInfo>> registeredReaders will remove this >>>> reader infos. When the source task is registered again, it will be added >>>> again. *Thus, registeredReaders cannot know whether is registered >>>> before. * >>>> >>>> Therefore, registeredReaders enumerator#addReader does not distinguish >>>> the following situations: >>>> However, each time one source task is failover. The >>>> `ConcurrentMap<Integer, ConcurrentMap<Integer, ReaderInfo>> >>>> registeredReaders` will remove this source. When source Task is registered >>>> again, enumerator#addReader not distinguished three situations: >>>> 1. The Reader is registered when the global restart. In this case, >>>> redistribution the split from the infos. (take off all the splits from >>>> ReaderInfo). >>>> 2. The Reader is registered when a partial failover(before the first >>>> successful checkpoint). In this case, ignore the split from the infos. >>>> (leave alone all the splits from ReaderInfo). >>>> 3. The Reader is registered when a partial failover(after the first >>>> successful checkpoint).In this case, we need assign the split to same >>>> reader again. (take off all the splits from ReaderInfo but assigned to it >>>> again). >>>> we still need the enumerator to distinguish them (using >>>> pendingSplitAssignment & assignedSplitAssignment. However, it is redundant >>>> to maintain split assigned information both in the enumerator and the >>>> enumerator context. >>>> >>>> I think if we change the behavior of addSplitsBack, it will be more >>>> simple. Just let the enumerator to handle these split based on >>>> pendingSplitAssignment >>>> & assignedSplitments. >>>> >>>> What do you think? >>>> >>>> Best, >>>> Hongshun >>>> >>>> On Fri, Oct 10, 2025 at 12:55 PM Becket Qin <[email protected]> >>>> wrote: >>>> >>>>> Hi Hongshun, >>>>> >>>>> Thanks for updating the FLIP. A quick question: why do we need to >>>>> change the behavior of addSplitsBack()? Should it remain the same? >>>>> >>>>> Regarding the case of restart with changed subscription. I think the >>>>> only correct behavior is removing obsolete splits without any warning / >>>>> exception. It is OK to add an info level logging if we want to. It is a >>>>> clear intention if the user has explicitly changed subscription and >>>>> restarted the job. There is no need to add a config to double confirm. >>>>> >>>>> Regards, >>>>> >>>>> Jiangjie (Becket) Qin >>>>> >>>>> On Thu, Oct 9, 2025 at 7:28 PM Hongshun Wang <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Leonard, >>>>>> >>>>>> If the SplitEnumerator received all splits after a restart, it >>>>>> becomes straightforward to clear and un-assign the unmatched >>>>>> splits(checking whether matches the source options). However, a key >>>>>> question arises: *should automatically discard obsolete splits, or >>>>>> explicitly notify the user via an exception?* >>>>>> >>>>>> We provided a option `scan.partition-unsubscribe.strategy`: >>>>>> 1. If Strict, throws an exception when encountering removed splits. >>>>>> 2. If Lenient, automatically removes obsolete splits silently. >>>>>> >>>>>> What Do you think? >>>>>> >>>>>> Best, >>>>>> Hongshun >>>>>> >>>>>> On Thu, Oct 9, 2025 at 9:37 PM Leonard Xu <[email protected]> wrote: >>>>>> >>>>>>> Thanks hongshun for the updating and pretty detailed analysis for >>>>>>> edge cases, the updated FLIP looks good to me now. >>>>>>> >>>>>>> Only last implementation details about scenario in motivation >>>>>>> section: >>>>>>> >>>>>>> *Restart with Changed subscription: During restart, if source >>>>>>> options remove a topic or table. The splits which have already assigned >>>>>>> can >>>>>>> not be removed.* >>>>>>> >>>>>>> Could you clarify how we resolve this in Kafka connector ? >>>>>>> >>>>>>> Best, >>>>>>> Leonard >>>>>>> >>>>>>> >>>>>>> >>>>>>> 2025 10月 9 19:48,Hongshun Wang <[email protected]> 写道: >>>>>>> >>>>>>> Hi devs, >>>>>>> If there are no further suggestions, I will start the voting >>>>>>> tomorrow。 >>>>>>> >>>>>>> Best, >>>>>>> Hongshun >>>>>>> >>>>>>> On Fri, Sep 26, 2025 at 7:48 PM Hongshun Wang < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Hi Becket and Leonard, >>>>>>>> >>>>>>>> I have updated the content of this FLIP. The key point is that: >>>>>>>> >>>>>>>> When the split enumerator receives a split, *these splits must >>>>>>>> have already existed in pendingSplitAssignment or assignedSplitments* >>>>>>>> . >>>>>>>> >>>>>>>> - If the split is in pendingSplitAssignments, ignore it. >>>>>>>> - If the split is in assignedSplitAssignments but has a >>>>>>>> different taskId, ignore it (this indicates it was already >>>>>>>> assigned to another task). >>>>>>>> - If the split is in assignedSplitAssignments and shares the >>>>>>>> same taskId, move the assignment from assignedSplitments to >>>>>>>> pendingSplitAssignment >>>>>>>> to re-assign again. >>>>>>>> >>>>>>>> >>>>>>>> For better understanding why use these strategies. I added some >>>>>>>> examples and pictures to show it. >>>>>>>> >>>>>>>> Would you like to help me check whether there are still some >>>>>>>> problems? >>>>>>>> >>>>>>>> Best >>>>>>>> Hongshun >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Sep 26, 2025 at 5:08 PM Leonard Xu <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks Becket and Hongshun for the insightful discussion. >>>>>>>>> >>>>>>>>> The underlying implementation and communication mechanisms of >>>>>>>>> Flink Source indeed involve many intricate details, we discussed the >>>>>>>>> issue >>>>>>>>> of splits re-assignment in specific scenarios, but fortunately, the >>>>>>>>> final >>>>>>>>> decision turned out to be pretty clear. >>>>>>>>> >>>>>>>>> +1 to Becket’s proposal to keeps the framework cleaner and more >>>>>>>>> flexible. >>>>>>>>> +1 to Hongshun’s point to provide comprehensive guidance for >>>>>>>>> connector developers. >>>>>>>>> >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Leonard >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> 2025 9月 26 16:30,Hongshun Wang <[email protected]> 写道: >>>>>>>>> >>>>>>>>> Hi Becket, >>>>>>>>> >>>>>>>>> I Got it. You’re suggesting we should not handle this in the >>>>>>>>> source framework but instead let the split enumerator manage these >>>>>>>>> three >>>>>>>>> scenarios. >>>>>>>>> >>>>>>>>> Let me explain why I originally favored handling it in the >>>>>>>>> framework: I'm concerned that connector developers might overlook >>>>>>>>> certain >>>>>>>>> edge cases (after all, we even payed extensive discussions to fully >>>>>>>>> clarify >>>>>>>>> the logic) >>>>>>>>> >>>>>>>>> However, your point keeps the framework cleaner and more flexible. >>>>>>>>> Thus, I will take it. >>>>>>>>> >>>>>>>>> Perhaps, in this FLIP, we should focus on providing comprehensive >>>>>>>>> guidance for connector developers: explain how to implement a >>>>>>>>> split enumerator, including the underlying challenges and their >>>>>>>>> solutions. >>>>>>>>> >>>>>>>>> >>>>>>>>> Additionally, we can use the Kafka connector as a reference >>>>>>>>> implementation to demonstrate the practical steps. This way, >>>>>>>>> developers who >>>>>>>>> want to implement similar connectors can directly reference this >>>>>>>>> example. >>>>>>>>> >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Hongshun >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Sep 26, 2025 at 1:27 PM Becket Qin <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> It would be good to not expose runtime details to the source >>>>>>>>>> implementation if possible. >>>>>>>>>> >>>>>>>>>> Today, the split enumerator implementations are expected to track >>>>>>>>>> the split assignment. >>>>>>>>>> >>>>>>>>>> Assuming the split enumerator implementation keeps a split >>>>>>>>>> assignment map, that means the enumerator should already know >>>>>>>>>> whether a >>>>>>>>>> split is assigned or unassigned. So it can handle the three >>>>>>>>>> scenarios you >>>>>>>>>> mentioned. >>>>>>>>>> >>>>>>>>>> The split is reported by a reader during a global restoration. >>>>>>>>>>> >>>>>>>>>> The split enumerator should have just been restored / created. If >>>>>>>>>> the enumerator expects a full reassignment of splits up on global >>>>>>>>>> recovery, >>>>>>>>>> there should be no assigned splits to that reader in the split >>>>>>>>>> assignment >>>>>>>>>> mapping. >>>>>>>>>> >>>>>>>>>> The split is reported by a reader during a partial failure >>>>>>>>>>> recovery. >>>>>>>>>>> >>>>>>>>>> In this case, when SplitEnumerator.addReader() is invoked, the >>>>>>>>>> split assignment map in the enumerator implementation should already >>>>>>>>>> have >>>>>>>>>> some split assignments for the reader. Therefore it is a partial >>>>>>>>>> failover. >>>>>>>>>> If the source supports split reassignment on recovery, the >>>>>>>>>> enumerator can >>>>>>>>>> assign splits that are different from the reported assignment of that >>>>>>>>>> reader in the SplitEnumeratorContext, or it can also assign the same >>>>>>>>>> splits. In any case, the enumerator knows that this is a partial >>>>>>>>>> recovery >>>>>>>>>> because the assignment map is non-empty. >>>>>>>>>> >>>>>>>>>> The split is not reported by a reader, but is assigned after the >>>>>>>>>>> last successful checkpoint and was never acknowledged. >>>>>>>>>> >>>>>>>>>> This is actually one of the step in the partial failure recover. >>>>>>>>>> SplitEnumerator.addSplitsBack() will be called first before >>>>>>>>>> SplitReader.addReader() is called for the recovered reader. When the >>>>>>>>>> SplitEnumerator.addSplitsBack() is invoked, it is for sure a partial >>>>>>>>>> recovery. And the enumerator should remove these splits from the >>>>>>>>>> split >>>>>>>>>> assignment map as if they were never assigned. >>>>>>>>>> >>>>>>>>>> I think this should work, right? >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> >>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>> >>>>>>>>>> On Thu, Sep 25, 2025 at 8:34 PM Hongshun Wang < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Hi Becket and Leonard, >>>>>>>>>>> >>>>>>>>>>> Thanks for your advice. >>>>>>>>>>> >>>>>>>>>>> > put all the reader information in the SplitEnumerator context >>>>>>>>>>> I have a concern: the current registeredReaders in* >>>>>>>>>>> SourceCoordinatorContext will be removed after subtaskResetor >>>>>>>>>>> execution on >>>>>>>>>>> failure*.However, this approach has merit. >>>>>>>>>>> >>>>>>>>>>> One more situation I found my previous design does not cover: >>>>>>>>>>> >>>>>>>>>>> 1. Initial state: Reader A reports splits (1, 2). >>>>>>>>>>> 2. Enumerator action: Assigns split 1 to Reader A, and split >>>>>>>>>>> 2 to Reader B. >>>>>>>>>>> 3. Failure scenario: Reader A fails before checkpointing. >>>>>>>>>>> Since this is a partial failure, only Reader A restarts. >>>>>>>>>>> 4. Recovery issue: Upon recovery, Reader A re-reports split >>>>>>>>>>> (1). >>>>>>>>>>> >>>>>>>>>>> In my previous design, the enumerator will ignore Reader A's >>>>>>>>>>> re-registration which will cause data loss. >>>>>>>>>>> >>>>>>>>>>> Thus, when the enumerator receives a split, the split may >>>>>>>>>>> originate from three scenarios: >>>>>>>>>>> >>>>>>>>>>> 1. The split is reported by a reader during a global >>>>>>>>>>> restoration. >>>>>>>>>>> 2. The split is reported by a reader during a partial >>>>>>>>>>> failure recovery. >>>>>>>>>>> 3. The split is not reported by a reader, but is assigned >>>>>>>>>>> after the last successful checkpoint and was never acknowledged. >>>>>>>>>>> >>>>>>>>>>> In the first scenario (global restore), the split should >>>>>>>>>>> be re-distributed. For the latter two scenarios (partial failover >>>>>>>>>>> and >>>>>>>>>>> post-checkpoint assignment), we need to reassign the split to >>>>>>>>>>> its originally assigned subtask. >>>>>>>>>>> >>>>>>>>>>> By implementing a method in the SplitEnumerator context to track >>>>>>>>>>> each assigned split's status, the system can correctly identify and >>>>>>>>>>> resolve >>>>>>>>>>> split ownership in all three scenarios.*What about adding a >>>>>>>>>>> `SplitRecoveryType splitRecoveryType(Split split)` in >>>>>>>>>>> SplitEnumeratorContext.* SplitRecoveryTypeis a enum including >>>>>>>>>>> `UNASSIGNED`、`GLOBAL_RESTORE`、`PARTIAL_FAILOVER` and >>>>>>>>>>> `POST_CHECKPOINT_ASSIGNMENT`. >>>>>>>>>>> >>>>>>>>>>> What do you think? Are there any details or scenarios I haven't >>>>>>>>>>> considered? Looking forward to your advice. >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Hongshun >>>>>>>>>>> >>>>>>>>>>> On Thu, Sep 11, 2025 at 12:41 AM Becket Qin < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Thanks for the explanation, Hongshun. >>>>>>>>>>>> >>>>>>>>>>>> Current pattern of handling new reader registration following: >>>>>>>>>>>> 1. put all the reader information in the SplitEnumerator context >>>>>>>>>>>> 2. notify the enumerator about the new reader registration. >>>>>>>>>>>> 3. Let the split enumerator get whatever information it wants >>>>>>>>>>>> from the >>>>>>>>>>>> context and do its job. >>>>>>>>>>>> This pattern decouples the information passing and the reader >>>>>>>>>>>> registration >>>>>>>>>>>> notification. This makes the API extensible - we can add more >>>>>>>>>>>> information >>>>>>>>>>>> (e.g. reported assigned splits in our case) about the reader to >>>>>>>>>>>> the context >>>>>>>>>>>> without introducing new methods. >>>>>>>>>>>> >>>>>>>>>>>> Introducing a new method of addSplitBackOnRecovery() is >>>>>>>>>>>> redundant to the >>>>>>>>>>>> above pattern. Do we really need it? >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> >>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Sep 8, 2025 at 8:18 PM Hongshun Wang < >>>>>>>>>>>> [email protected]> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> > Hi Becket, >>>>>>>>>>>> > >>>>>>>>>>>> > > I am curious what would the enumerator do differently for >>>>>>>>>>>> the splits >>>>>>>>>>>> > added via addSplitsBackOnRecovery() V.S. addSplitsBack()? >>>>>>>>>>>> > >>>>>>>>>>>> > In this FLIP, there are two distinct scenarios in which the >>>>>>>>>>>> enumerator >>>>>>>>>>>> > receives splits being added back: >>>>>>>>>>>> > 1. Job-level restore: The job is restored, splits from >>>>>>>>>>>> reader’s state are >>>>>>>>>>>> > reported by ReaderRegistrationEvent. >>>>>>>>>>>> > 2. Reader-level restart: a reader is started but not the >>>>>>>>>>>> whole job, >>>>>>>>>>>> > splits assigned to it after the last successful checkpoint. >>>>>>>>>>>> This is what >>>>>>>>>>>> > addSplitsBack used to do. >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > In these two situations, the enumerator will choose different >>>>>>>>>>>> strategies. >>>>>>>>>>>> > 1. Job-level restore: the splits should be redistributed >>>>>>>>>>>> across readers >>>>>>>>>>>> > according to the current partitioner strategy. >>>>>>>>>>>> > 2. Reader-level restart: the splits should be reassigned >>>>>>>>>>>> directly back to >>>>>>>>>>>> > the same reader they were originally assigned to, preserving >>>>>>>>>>>> locality and >>>>>>>>>>>> > avoiding unnecessary redistribution >>>>>>>>>>>> > >>>>>>>>>>>> > Therefore, the enumerator must clearly distinguish between >>>>>>>>>>>> these two >>>>>>>>>>>> > scenarios.I used to deprecate the former >>>>>>>>>>>> addSplitsBack(List<SplitT> >>>>>>>>>>>> > splits, int subtaskId) but add a new >>>>>>>>>>>> addSplitsBack(List<SplitT> >>>>>>>>>>>> > splits, int subtaskId, >>>>>>>>>>>> > boolean reportedByReader). >>>>>>>>>>>> > Leonard suggest to use another method addSplitsBackOnRecovery >>>>>>>>>>>> but not >>>>>>>>>>>> > influenced currently addSplitsBack. >>>>>>>>>>>> > >>>>>>>>>>>> > Best >>>>>>>>>>>> > Hongshun >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > On 2025/09/08 17:20:31 Becket Qin wrote: >>>>>>>>>>>> > > Hi Leonard, >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > > Could we introduce a new method like >>>>>>>>>>>> addSplitsBackOnRecovery with >>>>>>>>>>>> > default >>>>>>>>>>>> > > > implementation. In this way, we can provide better >>>>>>>>>>>> backward >>>>>>>>>>>> > compatibility >>>>>>>>>>>> > > > and also makes it easier for developers to understand. >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > I am curious what would the enumerator do differently for >>>>>>>>>>>> the splits >>>>>>>>>>>> > added >>>>>>>>>>>> > > via addSplitsBackOnRecovery() V.S. addSplitsBack()? Today, >>>>>>>>>>>> > addSplitsBack() >>>>>>>>>>>> > > is also only called upon recovery. So the new method seems >>>>>>>>>>>> confusing. One >>>>>>>>>>>> > > thing worth clarifying is if the Source implements >>>>>>>>>>>> > > SupportSplitReassignmentOnRecovery, upon recovery, should >>>>>>>>>>>> the splits >>>>>>>>>>>> > > reported by the readers also be added back to the >>>>>>>>>>>> SplitEnumerator via the >>>>>>>>>>>> > > addSplitsBack() call? Or should the SplitEnumerator >>>>>>>>>>>> explicitly query the >>>>>>>>>>>> > > registered reader information via the >>>>>>>>>>>> SplitEnumeratorContext to get the >>>>>>>>>>>> > > originally assigned splits when addReader() is invoked? I >>>>>>>>>>>> was assuming >>>>>>>>>>>> > the >>>>>>>>>>>> > > latter in the beginning, so the behavior of addSplitsBack() >>>>>>>>>>>> remains >>>>>>>>>>>> > > unchanged, but I am not opposed in doing the former. >>>>>>>>>>>> > > >>>>>>>>>>>> > > Also, can you elaborate on the backwards compatibility >>>>>>>>>>>> issue you see if >>>>>>>>>>>> > we >>>>>>>>>>>> > > do not have a separate addSplitsBackOnRecovery() method? >>>>>>>>>>>> Even without >>>>>>>>>>>> > this >>>>>>>>>>>> > > new method, the behavior remains exactly the same unless >>>>>>>>>>>> the end users >>>>>>>>>>>> > > implement the mix-in interface of >>>>>>>>>>>> "SupportSplitReassignmentOnRecovery", >>>>>>>>>>>> > > right? >>>>>>>>>>>> > > >>>>>>>>>>>> > > Thanks, >>>>>>>>>>>> > > >>>>>>>>>>>> > > Jiangjie (Becket) Qin >>>>>>>>>>>> > > >>>>>>>>>>>> > > On Mon, Sep 8, 2025 at 1:48 AM Hongshun Wang < >>>>>>>>>>>> [email protected]> >>>>>>>>>>>> > > wrote: >>>>>>>>>>>> > > >>>>>>>>>>>> > > > Hi devs, >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > It has been quite some time since this FLIP[1] was first >>>>>>>>>>>> proposed. >>>>>>>>>>>> > Thank >>>>>>>>>>>> > > > you for your valuable feedback—based on your suggestions, >>>>>>>>>>>> the FLIP has >>>>>>>>>>>> > > > undergone several rounds of revisions. >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > Any more advice is welcome and appreciated. If there are >>>>>>>>>>>> no further >>>>>>>>>>>> > > > concerns, I plan to start the vote tomorrow. >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > Best >>>>>>>>>>>> > > > Hongshun >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > [1] >>>>>>>>>>>> > > > >>>>>>>>>>>> > >>>>>>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480 >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > On Mon, Sep 8, 2025 at 4:42 PM Hongshun Wang < >>>>>>>>>>>> [email protected]> >>>>>>>>>>>> > > > wrote: >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > > Hi Leonard, >>>>>>>>>>>> > > > > Thanks for your advice. It makes sense and I have >>>>>>>>>>>> modified it. >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > Best, >>>>>>>>>>>> > > > > Hongshun >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > > On Mon, Sep 8, 2025 at 11:40 AM Leonard Xu < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > >> Thanks Hongshun and Becket for the deep discussion. >>>>>>>>>>>> > > > >> >>>>>>>>>>>> > > > >> I only have one comment for one API design: >>>>>>>>>>>> > > > >> >>>>>>>>>>>> > > > >> > Deprecate the old addSplitsBack method, use a >>>>>>>>>>>> addSplitsBack with >>>>>>>>>>>> > > > >> param isReportedByReader instead. Because, The >>>>>>>>>>>> enumerator can apply >>>>>>>>>>>> > > > >> different reassignment policies based on the context. >>>>>>>>>>>> > > > >> >>>>>>>>>>>> > > > >> Could we introduce a new method like >>>>>>>>>>>> *addSplitsBackOnRecovery* with >>>>>>>>>>>> > > > default >>>>>>>>>>>> > > > >> implementation. In this way, we can provide better >>>>>>>>>>>> backward >>>>>>>>>>>> > > > >> compatibility and also makes it easier for developers >>>>>>>>>>>> to understand. >>>>>>>>>>>> > > > >> >>>>>>>>>>>> > > > >> Best, >>>>>>>>>>>> > > > >> Leonard >>>>>>>>>>>> > > > >> >>>>>>>>>>>> > > > >> >>>>>>>>>>>> > > > >> >>>>>>>>>>>> > > > >> 2025 9月 3 20:26,Hongshun Wang <[email protected]> 写道: >>>>>>>>>>>> > > > >> >>>>>>>>>>>> > > > >> Hi Becket, >>>>>>>>>>>> > > > >> >>>>>>>>>>>> > > > >> I think that's a great idea! I have added the >>>>>>>>>>>> > > > >> SupportSplitReassignmentOnRecovery interface in this >>>>>>>>>>>> FLIP. If a >>>>>>>>>>>> > Source >>>>>>>>>>>> > > > >> implements this interface indicates that the source >>>>>>>>>>>> operator needs >>>>>>>>>>>> > to >>>>>>>>>>>> > > > >> report splits to the enumerator and receive >>>>>>>>>>>> reassignment.[1] >>>>>>>>>>>> > > > >> >>>>>>>>>>>> > > > >> Best, >>>>>>>>>>>> > > > >> Hongshun >>>>>>>>>>>> > > > >> >>>>>>>>>>>> > > > >> [1] >>>>>>>>>>>> > > > >> >>>>>>>>>>>> > > > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>>>>>>>>>> > > > >> >>>>>>>>>>>> > > > >> On Thu, Aug 21, 2025 at 12:09 PM Becket Qin < >>>>>>>>>>>> [email protected]> >>>>>>>>>>>> > > > wrote: >>>>>>>>>>>> > > > >> >>>>>>>>>>>> > > > >>> Hi Hongshun, >>>>>>>>>>>> > > > >>> >>>>>>>>>>>> > > > >>> I think the convention for such optional features in >>>>>>>>>>>> Source is via >>>>>>>>>>>> > > > >>> mix-in interfaces. So instead of adding a method to >>>>>>>>>>>> the >>>>>>>>>>>> > SourceReader, >>>>>>>>>>>> > > > maybe >>>>>>>>>>>> > > > >>> we should introduce an interface >>>>>>>>>>>> SupportSplitReassingmentOnRecovery >>>>>>>>>>>> > > > with >>>>>>>>>>>> > > > >>> this method. If a Source implementation implements >>>>>>>>>>>> that interface, >>>>>>>>>>>> > > > then the >>>>>>>>>>>> > > > >>> SourceOperator will check the desired behavior and >>>>>>>>>>>> act accordingly. >>>>>>>>>>>> > > > >>> >>>>>>>>>>>> > > > >>> Thanks, >>>>>>>>>>>> > > > >>> >>>>>>>>>>>> > > > >>> Jiangjie (Becket) Qin >>>>>>>>>>>> > > > >>> >>>>>>>>>>>> > > > >>> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang < >>>>>>>>>>>> > [email protected] >>>>>>>>>>>> > > > > >>>>>>>>>>>> > > > >>> wrote: >>>>>>>>>>>> > > > >>> >>>>>>>>>>>> > > > >>>> Hi de vs, >>>>>>>>>>>> > > > >>>> >>>>>>>>>>>> > > > >>>> Would anyone like to discuss this FLIP? I'd >>>>>>>>>>>> appreciate your >>>>>>>>>>>> > feedback >>>>>>>>>>>> > > > >>>> and suggestions. >>>>>>>>>>>> > > > >>>> >>>>>>>>>>>> > > > >>>> Best, >>>>>>>>>>>> > > > >>>> Hongshun >>>>>>>>>>>> > > > >>>> >>>>>>>>>>>> > > > >>>> >>>>>>>>>>>> > > > >>>> 2025年8月13日 14:23,Hongshun Wang <[email protected]> 写道: >>>>>>>>>>>> > > > >>>> >>>>>>>>>>>> > > > >>>> Hi Becket, >>>>>>>>>>>> > > > >>>> >>>>>>>>>>>> > > > >>>> Thank you for your detailed feedback. The new >>>>>>>>>>>> contract makes good >>>>>>>>>>>> > > > sense >>>>>>>>>>>> > > > >>>> to me and effectively addresses the issues I >>>>>>>>>>>> encountered at the >>>>>>>>>>>> > > > beginning >>>>>>>>>>>> > > > >>>> of the design. >>>>>>>>>>>> > > > >>>> >>>>>>>>>>>> > > > >>>> That said, I recommend not reporting splits by >>>>>>>>>>>> default, primarily >>>>>>>>>>>> > for >>>>>>>>>>>> > > > >>>> compatibility and practical reasons: >>>>>>>>>>>> > > > >>>> >>>>>>>>>>>> > > > >>>> > For these reasons, we do not expect the Split >>>>>>>>>>>> objects to be >>>>>>>>>>>> > huge, >>>>>>>>>>>> > > > >>>> and we are not trying to design for huge Split >>>>>>>>>>>> objects either as >>>>>>>>>>>> > they >>>>>>>>>>>> > > > will >>>>>>>>>>>> > > > >>>> have problems even today. >>>>>>>>>>>> > > > >>>> >>>>>>>>>>>> > > > >>>> 1. >>>>>>>>>>>> > > > >>>> >>>>>>>>>>>> > > > >>>> Not all existing connector match this rule >>>>>>>>>>>> > > > >>>> For example, in mysql cdc connector, a binlog >>>>>>>>>>>> split may contain >>>>>>>>>>>> > > > >>>> hundreds (or even more) snapshot split completion >>>>>>>>>>>> records. This >>>>>>>>>>>> > > > state is >>>>>>>>>>>> > > > >>>> large and is currently transmitted incrementally >>>>>>>>>>>> through >>>>>>>>>>>> > multiple >>>>>>>>>>>> > > > >>>> BinlogSplitMetaEvent messages. Since the binlog >>>>>>>>>>>> reader operates >>>>>>>>>>>> > > > >>>> with single parallelism, reporting the full split >>>>>>>>>>>> state on >>>>>>>>>>>> > recovery >>>>>>>>>>>> > > > >>>> could be inefficient or even infeasible. >>>>>>>>>>>> > > > >>>> For such sources, it would be better to provide a >>>>>>>>>>>> mechanism to >>>>>>>>>>>> > skip >>>>>>>>>>>> > > > >>>> split reporting during restart until they >>>>>>>>>>>> redesign and reduce >>>>>>>>>>>> > the >>>>>>>>>>>> > > > >>>> split size. >>>>>>>>>>>> > > > >>>> 2. >>>>>>>>>>>> > > > >>>> >>>>>>>>>>>> > > > >>>> Not all enumerators maintain unassigned splits in >>>>>>>>>>>> state. >>>>>>>>>>>> > > > >>>> Some SplitEnumerator(such as kafka connector) >>>>>>>>>>>> implementations >>>>>>>>>>>> > do >>>>>>>>>>>> > > > >>>> not track or persistently manage unassigned >>>>>>>>>>>> splits. Requiring >>>>>>>>>>>> > them >>>>>>>>>>>> > > > to >>>>>>>>>>>> > > > >>>> handle re-registration would add unnecessary >>>>>>>>>>>> complexity. Even >>>>>>>>>>>> > > > though we >>>>>>>>>>>> > > > >>>> maybe implements in kafka connector, currently, >>>>>>>>>>>> kafka connector >>>>>>>>>>>> > is >>>>>>>>>>>> > > > decouple >>>>>>>>>>>> > > > >>>> with flink version, we also need to make sure the >>>>>>>>>>>> elder version >>>>>>>>>>>> > is >>>>>>>>>>>> > > > >>>> compatible. >>>>>>>>>>>> > > > >>>> >>>>>>>>>>>> > > > >>>> ------------------------------ >>>>>>>>>>>> > > > >>>> >>>>>>>>>>>> > > > >>>> To address these concerns, I propose introducing a >>>>>>>>>>>> new method: >>>>>>>>>>>> > boolean >>>>>>>>>>>> > > > >>>> SourceReader#shouldReassignSplitsOnRecovery() with a >>>>>>>>>>>> default >>>>>>>>>>>> > > > >>>> implementation returning false. This allows source >>>>>>>>>>>> readers to opt >>>>>>>>>>>> > in >>>>>>>>>>>> > > > >>>> to split reassignment only when necessary. Since the >>>>>>>>>>>> new contract >>>>>>>>>>>> > > > already >>>>>>>>>>>> > > > >>>> places the responsibility for split assignment on >>>>>>>>>>>> the enumerator, >>>>>>>>>>>> > not >>>>>>>>>>>> > > > >>>> reporting splits by default is a safe and clean >>>>>>>>>>>> default behavior. >>>>>>>>>>>> > > > >>>> >>>>>>>>>>>> > > > >>>> >>>>>>>>>>>> > > > >>>> ------------------------------ >>>>>>>>>>>> > > > >>>> >>>>>>>>>>>> > > > >>>> >>>>>>>>>>>> > > > >>>> I’ve updated the implementation and the FIP >>>>>>>>>>>> accordingly[1]. It >>>>>>>>>>>> > quite a >>>>>>>>>>>> > > > >>>> big change. In particular, for the Kafka connector, >>>>>>>>>>>> we can now use >>>>>>>>>>>> > a >>>>>>>>>>>> > > > >>>> pluggable SplitPartitioner to support different >>>>>>>>>>>> split assignment >>>>>>>>>>>> > > > >>>> strategies (e.g., default, round-robin). >>>>>>>>>>>> > > > >>>> >>>>>>>>>>>> > > > >>>> >>>>>>>>>>>> > > > >>>> Could you please review it when you have a chance? >>>>>>>>>>>> > > > >>>> >>>>>>>>>>>> > > > >>>> >>>>>>>>>>>> > > > >>>> Best, >>>>>>>>>>>> > > > >>>> >>>>>>>>>>>> > > > >>>> Hongshun >>>>>>>>>>>> > > > >>>> >>>>>>>>>>>> > > > >>>> >>>>>>>>>>>> > > > >>>> [1] >>>>>>>>>>>> > > > >>>> >>>>>>>>>>>> > > > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>>>>>>>>>> > > > >>>> >>>>>>>>>>>> > > > >>>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin < >>>>>>>>>>>> [email protected]> >>>>>>>>>>>> > > > wrote: >>>>>>>>>>>> > > > >>>> >>>>>>>>>>>> > > > >>>>> Hi Hongshun, >>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>> > > > >>>>> I am not too concerned about the transmission cost. >>>>>>>>>>>> Because the >>>>>>>>>>>> > full >>>>>>>>>>>> > > > >>>>> split transmission has to happen in the initial >>>>>>>>>>>> assignment phase >>>>>>>>>>>> > > > already. >>>>>>>>>>>> > > > >>>>> And in the future, we probably want to also >>>>>>>>>>>> introduce some kind >>>>>>>>>>>> > of >>>>>>>>>>>> > > > workload >>>>>>>>>>>> > > > >>>>> balance across source readers, e.g. based on the >>>>>>>>>>>> per-split >>>>>>>>>>>> > > > throughput or >>>>>>>>>>>> > > > >>>>> the per-source-reader workload in heterogeneous >>>>>>>>>>>> clusters. For >>>>>>>>>>>> > these >>>>>>>>>>>> > > > >>>>> reasons, we do not expect the Split objects to be >>>>>>>>>>>> huge, and we >>>>>>>>>>>> > are >>>>>>>>>>>> > > > not >>>>>>>>>>>> > > > >>>>> trying to design for huge Split objects either as >>>>>>>>>>>> they will have >>>>>>>>>>>> > > > problems >>>>>>>>>>>> > > > >>>>> even today. >>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>> > > > >>>>> Good point on the potential split loss, please see >>>>>>>>>>>> the reply >>>>>>>>>>>> > below: >>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>> > > > >>>>> Scenario 2: >>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and Reader B >>>>>>>>>>>> reports (3 >>>>>>>>>>>> > and 4) >>>>>>>>>>>> > > > >>>>>> upon restart. >>>>>>>>>>>> > > > >>>>>> 2. Before the enumerator receives all reports and >>>>>>>>>>>> performs >>>>>>>>>>>> > > > >>>>>> reassignment, a checkpoint is triggered. >>>>>>>>>>>> > > > >>>>>> 3. Since no splits have been reassigned yet, both >>>>>>>>>>>> readers have >>>>>>>>>>>> > empty >>>>>>>>>>>> > > > >>>>>> states. >>>>>>>>>>>> > > > >>>>>> 4. When restarting from this checkpoint, all four >>>>>>>>>>>> splits are >>>>>>>>>>>> > lost. >>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>> > > > >>>>> The reader registration happens in the >>>>>>>>>>>> SourceOperator.open(), >>>>>>>>>>>> > which >>>>>>>>>>>> > > > >>>>> means the task is still in the initializing state, >>>>>>>>>>>> therefore the >>>>>>>>>>>> > > > checkpoint >>>>>>>>>>>> > > > >>>>> should not be triggered until the enumerator >>>>>>>>>>>> receives all the >>>>>>>>>>>> > split >>>>>>>>>>>> > > > reports. >>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>> > > > >>>>> There is a nuance here. Today, the RPC call from >>>>>>>>>>>> the TM to the JM >>>>>>>>>>>> > is >>>>>>>>>>>> > > > >>>>> async. So it is possible that the >>>>>>>>>>>> SourceOpertor.open() has >>>>>>>>>>>> > returned, >>>>>>>>>>>> > > > but >>>>>>>>>>>> > > > >>>>> the enumerator has not received the split reports. >>>>>>>>>>>> However, >>>>>>>>>>>> > because >>>>>>>>>>>> > > > the >>>>>>>>>>>> > > > >>>>> task status update RPC call goes to the same >>>>>>>>>>>> channel as the split >>>>>>>>>>>> > > > reports >>>>>>>>>>>> > > > >>>>> call, so the task status RPC call will happen after >>>>>>>>>>>> the split >>>>>>>>>>>> > > > reports call >>>>>>>>>>>> > > > >>>>> on the JM side. Therefore, on the JM side, the >>>>>>>>>>>> SourceCoordinator >>>>>>>>>>>> > will >>>>>>>>>>>> > > > >>>>> always first receive the split reports, then >>>>>>>>>>>> receive the >>>>>>>>>>>> > checkpoint >>>>>>>>>>>> > > > request. >>>>>>>>>>>> > > > >>>>> This "happen before" relationship is kind of >>>>>>>>>>>> important to >>>>>>>>>>>> > guarantee >>>>>>>>>>>> > > > >>>>> the consistent state between enumerator and readers. >>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>> > > > >>>>> Scenario 1: >>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned splits >>>>>>>>>>>> (1 and 2), and >>>>>>>>>>>> > > > >>>>>> Reader B reports (3 and 4). >>>>>>>>>>>> > > > >>>>>> 2. The enumerator receives these reports but only >>>>>>>>>>>> reassigns >>>>>>>>>>>> > splits 1 >>>>>>>>>>>> > > > >>>>>> and 2 — not 3 and 4. >>>>>>>>>>>> > > > >>>>>> 3. A checkpoint or savepoint is then triggered. >>>>>>>>>>>> Only splits 1 >>>>>>>>>>>> > and 2 >>>>>>>>>>>> > > > >>>>>> are recorded in the reader states; splits 3 and 4 >>>>>>>>>>>> are not >>>>>>>>>>>> > persisted. >>>>>>>>>>>> > > > >>>>>> 4. If the job is later restarted from this >>>>>>>>>>>> checkpoint, splits 3 >>>>>>>>>>>> > and >>>>>>>>>>>> > > > 4 >>>>>>>>>>>> > > > >>>>>> will be permanently lost. >>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>> > > > >>>>> This scenario is possible. One solution is to let >>>>>>>>>>>> the enumerator >>>>>>>>>>>> > > > >>>>> implementation handle this. That means if the >>>>>>>>>>>> enumerator relies >>>>>>>>>>>> > on >>>>>>>>>>>> > > > the >>>>>>>>>>>> > > > >>>>> initial split reports from the source readers, it >>>>>>>>>>>> should maintain >>>>>>>>>>>> > > > these >>>>>>>>>>>> > > > >>>>> reports by itself. In the above example, the >>>>>>>>>>>> enumerator will need >>>>>>>>>>>> > to >>>>>>>>>>>> > > > >>>>> remember that 3 and 4 are not assigned and put it >>>>>>>>>>>> into its own >>>>>>>>>>>> > state. >>>>>>>>>>>> > > > >>>>> The current contract is that anything assigned to >>>>>>>>>>>> the >>>>>>>>>>>> > SourceReaders >>>>>>>>>>>> > > > >>>>> are completely owned by the SourceReaders. >>>>>>>>>>>> Enumerators can >>>>>>>>>>>> > remember >>>>>>>>>>>> > > > the >>>>>>>>>>>> > > > >>>>> assignments but cannot change them, even when the >>>>>>>>>>>> source reader >>>>>>>>>>>> > > > recovers / >>>>>>>>>>>> > > > >>>>> restarts. >>>>>>>>>>>> > > > >>>>> With this FLIP, the contract becomes that the >>>>>>>>>>>> source readers will >>>>>>>>>>>> > > > >>>>> return the ownership of the splits to the >>>>>>>>>>>> enumerator. So the >>>>>>>>>>>> > > > enumerator is >>>>>>>>>>>> > > > >>>>> responsible for maintaining these splits, until >>>>>>>>>>>> they are assigned >>>>>>>>>>>> > to >>>>>>>>>>>> > > > a >>>>>>>>>>>> > > > >>>>> source reader again. >>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>> > > > >>>>> There are other cases where there may be conflict >>>>>>>>>>>> information >>>>>>>>>>>> > between >>>>>>>>>>>> > > > >>>>> reader and enumerator. For example, consider the >>>>>>>>>>>> following >>>>>>>>>>>> > sequence: >>>>>>>>>>>> > > > >>>>> 1. reader A reports splits (1 and 2) up on restart. >>>>>>>>>>>> > > > >>>>> 2. enumerator receives the report and assigns both >>>>>>>>>>>> 1 and 2 to >>>>>>>>>>>> > reader >>>>>>>>>>>> > > > B. >>>>>>>>>>>> > > > >>>>> 3. reader A failed before checkpointing. And this >>>>>>>>>>>> is a partial >>>>>>>>>>>> > > > >>>>> failure, so only reader A restarts. >>>>>>>>>>>> > > > >>>>> 4. When reader A recovers, it will again report >>>>>>>>>>>> splits (1 and 2) >>>>>>>>>>>> > to >>>>>>>>>>>> > > > >>>>> the enumerator. >>>>>>>>>>>> > > > >>>>> 5. The enumerator should ignore this report because >>>>>>>>>>>> it has >>>>>>>>>>>> > > > >>>>> assigned splits (1 and 2) to reader B. >>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>> > > > >>>>> So with the new contract, the enumerator should be >>>>>>>>>>>> the source of >>>>>>>>>>>> > > > truth >>>>>>>>>>>> > > > >>>>> for split ownership. >>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>> > > > >>>>> Thanks, >>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>> > > > >>>>> Jiangjie (Becket) Qin >>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>> > > > >>>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang < >>>>>>>>>>>> > > > [email protected]> >>>>>>>>>>>> > > > >>>>> wrote: >>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>> > > > >>>>>> Hi Becket, >>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>> > > > >>>>>> I did consider this approach at the beginning (and >>>>>>>>>>>> it was also >>>>>>>>>>>> > > > >>>>>> mentioned in this FLIP), since it would allow more >>>>>>>>>>>> flexibility >>>>>>>>>>>> > in >>>>>>>>>>>> > > > >>>>>> reassigning all splits. However, there are a few >>>>>>>>>>>> potential >>>>>>>>>>>> > issues. >>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>> > > > >>>>>> 1. High Transmission Cost >>>>>>>>>>>> > > > >>>>>> If we pass the full split objects (rather than >>>>>>>>>>>> just split IDs), >>>>>>>>>>>> > the >>>>>>>>>>>> > > > >>>>>> data size could be significant, leading to high >>>>>>>>>>>> overhead during >>>>>>>>>>>> > > > >>>>>> transmission — especially when many splits are >>>>>>>>>>>> involved. >>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>> > > > >>>>>> 2. Risk of Split Loss >>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>> > > > >>>>>> Risk of split loss exists unless we have a >>>>>>>>>>>> mechanism to make >>>>>>>>>>>> > sure >>>>>>>>>>>> > > > >>>>>> only can checkpoint after all the splits are >>>>>>>>>>>> reassigned. >>>>>>>>>>>> > > > >>>>>> There are scenarios where splits could be lost due >>>>>>>>>>>> to >>>>>>>>>>>> > inconsistent >>>>>>>>>>>> > > > >>>>>> state handling during recovery: >>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>> > > > >>>>>> Scenario 1: >>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned >>>>>>>>>>>> splits (1 and 2), >>>>>>>>>>>> > and >>>>>>>>>>>> > > > >>>>>> Reader B reports (3 and 4). >>>>>>>>>>>> > > > >>>>>> 2. The enumerator receives these reports but >>>>>>>>>>>> only reassigns >>>>>>>>>>>> > > > >>>>>> splits 1 and 2 — not 3 and 4. >>>>>>>>>>>> > > > >>>>>> 3. A checkpoint or savepoint is then triggered. >>>>>>>>>>>> Only splits 1 >>>>>>>>>>>> > and >>>>>>>>>>>> > > > >>>>>> 2 are recorded in the reader states; splits 3 >>>>>>>>>>>> and 4 are not >>>>>>>>>>>> > > > persisted. >>>>>>>>>>>> > > > >>>>>> 4. If the job is later restarted from this >>>>>>>>>>>> checkpoint, splits >>>>>>>>>>>> > 3 >>>>>>>>>>>> > > > >>>>>> and 4 will be permanently lost. >>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>> > > > >>>>>> Scenario 2: >>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and >>>>>>>>>>>> Reader B reports (3 >>>>>>>>>>>> > and >>>>>>>>>>>> > > > >>>>>> 4) upon restart. >>>>>>>>>>>> > > > >>>>>> 2. Before the enumerator receives all reports >>>>>>>>>>>> and performs >>>>>>>>>>>> > > > >>>>>> reassignment, a checkpoint is triggered. >>>>>>>>>>>> > > > >>>>>> 3. Since no splits have been reassigned yet, >>>>>>>>>>>> both readers >>>>>>>>>>>> > have >>>>>>>>>>>> > > > >>>>>> empty states. >>>>>>>>>>>> > > > >>>>>> 4. When restarting from this checkpoint, all >>>>>>>>>>>> four splits are >>>>>>>>>>>> > > > lost. >>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>> > > > >>>>>> Let me know if you have thoughts on how we might >>>>>>>>>>>> mitigate these >>>>>>>>>>>> > > > risks! >>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>> > > > >>>>>> Best >>>>>>>>>>>> > > > >>>>>> Hongshun >>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>> > > > >>>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin < >>>>>>>>>>>> [email protected]> >>>>>>>>>>>> > > > >>>>>> wrote: >>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>> > > > >>>>>>> Hi Hongshun, >>>>>>>>>>>> > > > >>>>>>> >>>>>>>>>>>> > > > >>>>>>> The steps sound reasonable to me in general. In >>>>>>>>>>>> terms of the >>>>>>>>>>>> > > > updated >>>>>>>>>>>> > > > >>>>>>> FLIP wiki, it would be good to see if we can keep >>>>>>>>>>>> the protocol >>>>>>>>>>>> > > > simple. One >>>>>>>>>>>> > > > >>>>>>> alternative way to achieve this behavior is >>>>>>>>>>>> following: >>>>>>>>>>>> > > > >>>>>>> >>>>>>>>>>>> > > > >>>>>>> 1. Upon SourceOperator startup, the >>>>>>>>>>>> SourceOperator sends >>>>>>>>>>>> > > > >>>>>>> ReaderRegistrationEvent with the currently >>>>>>>>>>>> assigned splits to >>>>>>>>>>>> > the >>>>>>>>>>>> > > > >>>>>>> enumerator. It does not add these splits to the >>>>>>>>>>>> SourceReader. >>>>>>>>>>>> > > > >>>>>>> 2. The enumerator will always use the >>>>>>>>>>>> > > > >>>>>>> SourceEnumeratorContext.assignSplits() to assign >>>>>>>>>>>> the splits. >>>>>>>>>>>> > (not >>>>>>>>>>>> > > > via the >>>>>>>>>>>> > > > >>>>>>> response of the SourceRegistrationEvent, this >>>>>>>>>>>> allows async >>>>>>>>>>>> > split >>>>>>>>>>>> > > > assignment >>>>>>>>>>>> > > > >>>>>>> in case the enumerator wants to wait until all >>>>>>>>>>>> the readers are >>>>>>>>>>>> > > > registered) >>>>>>>>>>>> > > > >>>>>>> 3. The SourceOperator will only call >>>>>>>>>>>> SourceReader.addSplits() >>>>>>>>>>>> > when >>>>>>>>>>>> > > > >>>>>>> it receives the AddSplitEvent from the enumerator. >>>>>>>>>>>> > > > >>>>>>> >>>>>>>>>>>> > > > >>>>>>> This protocol has a few benefits: >>>>>>>>>>>> > > > >>>>>>> 1. it basically allows arbitrary split >>>>>>>>>>>> reassignment upon >>>>>>>>>>>> > restart >>>>>>>>>>>> > > > >>>>>>> 2. simplicity: there is only one way to assign >>>>>>>>>>>> splits. >>>>>>>>>>>> > > > >>>>>>> >>>>>>>>>>>> > > > >>>>>>> So we only need one interface change: >>>>>>>>>>>> > > > >>>>>>> - add the initially assigned splits to ReaderInfo >>>>>>>>>>>> so the >>>>>>>>>>>> > Enumerator >>>>>>>>>>>> > > > >>>>>>> can access it. >>>>>>>>>>>> > > > >>>>>>> and one behavior change: >>>>>>>>>>>> > > > >>>>>>> - The SourceOperator should stop assigning splits >>>>>>>>>>>> to the from >>>>>>>>>>>> > state >>>>>>>>>>>> > > > >>>>>>> restoration, but >>>>>>>>>>>> > [message truncated...] >>>>>>>>>>>> > >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>
