Hi devs, If there is no other problem, I will start a vote later.
Best, Hongshun On Mon, Oct 13, 2025 at 4:17 PM Hongshun Wang <[email protected]> wrote: > Hi Becket and Leonard, > > It seems adding `splitsOnRecovery` to `ReaderInfo` makes the split > enumerator simpler and cleaner. > > I have modified this FLIP again. Please have a look and let me know what > you think. > > Best, > Hongshun > > On Mon, Oct 13, 2025 at 10:48 AM Hongshun Wang <[email protected]> > wrote: > >> Hi Becket, >> Thanks for your explanation. >> >> > For the same three input above, the assignment should be consistently >> the same. >> >> That is exactly what troubles me. For *assignment algorithms such as >> hash, it does behave the same. What If we use round-robin? Each *the *reader >> information, the same split will be assigned to different readers. There is >> also what I used to list as an example.* >> >> 1. *Initial state:*: 2 parallelism, 2 splits. >> 2. *Enumerator action:* Split 1 → Task 1, Split 2 → Task 2 , , >> 3. *Failure scenario: *After Split 2 is assigned to Task 2 but before >> next checkpoint success, task 1 restarts. >> 4. *Recovery issue:* Split 2 is re-added to the enumerator. >> Round-robin strategy assigns Split 2 to Task 1. Then Task 1 now has 2 >> splits, Task 2 has 0 → Imbalanced distribution. >> >> >> > Please let me know if you think a meeting would be more efficient. >> Yes, I’d like to reach an agreement as soon as possible. If you’re >> available, we could schedule a meeting with Lenenord as well. >> >> Best, >> Hongshun >> >> On Sat, Oct 11, 2025 at 3:59 PM Becket Qin <[email protected]> wrote: >> >>> Hi Hongshun, >>> >>> I am confused. First of all, regardless of what the assignment algorithm >>> is. Using SplitEnumeratorContext to return the splits only gives more >>> information than using addSplitsBack(). So there should be no regression. >>> >>> Secondly, at this point. The SplitEnumerator should only take the >>> following three input to generate the global splits assignment: >>> 1. the *reader information (num readers, locations, etc)* >>> 2. *all the splits to assign* >>> 3. *configured assignment algorithm * >>> Preferably, for the same three input above, the assignment should be >>> consistently the same. I don't see why it should care about why a new >>> reader is added, whether due to partial failover or global failover or job >>> restart. >>> >>> If you want to do global redistribution on global failover and restart, >>> but honor the existing assignment for partial failover. The enumerator will >>> just do the following: >>> 1. Generate a new global assignment (global redistribution) in start() >>> because start() will only be invoked in global failover or restart. That >>> means all the readers are also new with empty assignment. >>> 2. After the global assignment is generated, it should be honored for >>> the whole life cycle. there might be many reader registrations, again for >>> different reasons but does not matter: >>> - reader registration after this job restart >>> - reader registration after this global failover >>> - reader registration due to partial failover which may or may not >>> have a addSplitsBack() call. >>> Regardless of the reason, the split enumerator will just enforce the >>> global assignment it has already generated, i.e. without split >>> redistribution. >>> >>> Wouldn't that give the behavior you want? I feel the discussion somehow >>> goes to circles. Please let me know if you think a meeting would be more >>> efficient. >>> >>> Thanks, >>> >>> Jiangjie (Becket) Qin >>> >>> On Fri, Oct 10, 2025 at 7:58 PM Hongshun Wang <[email protected]> >>> wrote: >>> >>>> Hi Becket, >>>> >>>> > Ignore a returned split if it has been assigned to a different >>>> reader, otherwise put it back to unassigned splits / pending splits. Then >>>> the enumerator assigns new splits to the newly added reader, which may use >>>> the previous assignment as a reference. This should work regardless of >>>> whether it is a global failover, partial failover, restart, etc. There is >>>> no need for the SplitEnumerator to distinguish what failover scenario it >>>> is. >>>> >>>> In this case, it seems that global failover and partial failover share >>>> the same distribution strategy If it has not been assigned to a different >>>> reader. However, global failover needs to be redistributed(this is why we >>>> need this FLIP) , while partial failover is not. I have no idea how we >>>> distinguish them. >>>> >>>> What do you think? >>>> >>>> Best, >>>> Hongshun >>>> >>>> On Sat, Oct 11, 2025 at 12:54 AM Becket Qin <[email protected]> >>>> wrote: >>>> >>>>> Hi Hongshun, >>>>> >>>>> The problem we are trying to solve here is to give the splits back to >>>>> the SplitEnumerator. There are only two types of splits to give back: >>>>> 1) splits whose assignment has been checkpointed. - In this case, we >>>>> rely on addReader() + SplitEnumeratorContext to give the splits back, this >>>>> provides more information associated with those splits. >>>>> 2) splits whose assignment has not been checkpointed. - In this case, >>>>> we use addSplitsBack(), there is no reader info to give because the >>>>> previous assignment did not take effect to begin with. >>>>> >>>>> From the SplitEnumerator implementation perspective, the contract is >>>>> straightforward. >>>>> 1. The SplitEnumerator is the source of truth for assignment. >>>>> 2. When the enumerator receives the addSplits() call, it always add >>>>> these splits back to unassigned splits / pending splits. >>>>> 3. When the enumerator receives the addReader() call, that means the >>>>> reader has no current assignment, and has returned its previous assignment >>>>> based on the reader side info. The SplitEnumerator checks the >>>>> SplitEnumeratorContext to retrieve the returned splits from that reader >>>>> (i.e. previous assignment) and handle them according to its own source of >>>>> truth knowledge of assignment - Ignore a returned split if it has been >>>>> assigned to a different reader, otherwise put it back to unassigned splits >>>>> / pending splits. Then the enumerator assigns new splits to the newly >>>>> added >>>>> reader, which may use the previous assignment as a reference. This should >>>>> work regardless of whether it is a global failover, partial failover, >>>>> restart, etc. There is no need for the SplitEnumerator to distinguish what >>>>> failover scenario it is. >>>>> >>>>> Would this work? >>>>> >>>>> Thanks, >>>>> >>>>> Jiangjie (Becket) Qin >>>>> >>>>> On Fri, Oct 10, 2025 at 1:28 AM Hongshun Wang <[email protected]> >>>>> wrote: >>>>> >>>>>> Hi Becket, >>>>>> > why do we need to change the behavior of addSplitsBack()? Should >>>>>> it remain the same? >>>>>> >>>>>> How does the enumerator get the splits from ReaderRegistrationEvent >>>>>> and then reassign it? >>>>>> >>>>>> You have given a advice before: >>>>>> > 1. Put all the reader information in the SplitEnumerator context. >>>>>> 2. notify the enumerator about the new reader registration. 3. let the >>>>>> split enumerator get whatever information it wants from the context and >>>>>> do >>>>>> its job. >>>>>> >>>>>> However, each time a source task fails over, the >>>>>> ConcurrentMap<Integer, ConcurrentMap<Integer, ReaderInfo>> >>>>>> registeredReaders will remove this reader infos. When the source task is >>>>>> registered again, it will be added again. *Thus, registeredReaders >>>>>> cannot know whether is registered before. * >>>>>> >>>>>> Therefore, registeredReaders enumerator#addReader does not >>>>>> distinguish the following situations: >>>>>> However, each time one source task is failover. The >>>>>> `ConcurrentMap<Integer, ConcurrentMap<Integer, ReaderInfo>> >>>>>> registeredReaders` will remove this source. When source Task is >>>>>> registered >>>>>> again, enumerator#addReader not distinguished three situations: >>>>>> 1. The Reader is registered when the global restart. In this case, >>>>>> redistribution the split from the infos. (take off all the splits from >>>>>> ReaderInfo). >>>>>> 2. The Reader is registered when a partial failover(before the first >>>>>> successful checkpoint). In this case, ignore the split from the infos. >>>>>> (leave alone all the splits from ReaderInfo). >>>>>> 3. The Reader is registered when a partial failover(after the first >>>>>> successful checkpoint).In this case, we need assign the split to same >>>>>> reader again. (take off all the splits from ReaderInfo but assigned to it >>>>>> again). >>>>>> we still need the enumerator to distinguish them (using >>>>>> pendingSplitAssignment & assignedSplitAssignment. However, it is >>>>>> redundant >>>>>> to maintain split assigned information both in the enumerator and the >>>>>> enumerator context. >>>>>> >>>>>> I think if we change the behavior of addSplitsBack, it will be more >>>>>> simple. Just let the enumerator to handle these split based on >>>>>> pendingSplitAssignment >>>>>> & assignedSplitments. >>>>>> >>>>>> What do you think? >>>>>> >>>>>> Best, >>>>>> Hongshun >>>>>> >>>>>> On Fri, Oct 10, 2025 at 12:55 PM Becket Qin <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> Hi Hongshun, >>>>>>> >>>>>>> Thanks for updating the FLIP. A quick question: why do we need to >>>>>>> change the behavior of addSplitsBack()? Should it remain the same? >>>>>>> >>>>>>> Regarding the case of restart with changed subscription. I think the >>>>>>> only correct behavior is removing obsolete splits without any warning / >>>>>>> exception. It is OK to add an info level logging if we want to. It is a >>>>>>> clear intention if the user has explicitly changed subscription and >>>>>>> restarted the job. There is no need to add a config to double confirm. >>>>>>> >>>>>>> Regards, >>>>>>> >>>>>>> Jiangjie (Becket) Qin >>>>>>> >>>>>>> On Thu, Oct 9, 2025 at 7:28 PM Hongshun Wang < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Hi Leonard, >>>>>>>> >>>>>>>> If the SplitEnumerator received all splits after a restart, it >>>>>>>> becomes straightforward to clear and un-assign the unmatched >>>>>>>> splits(checking whether matches the source options). However, a key >>>>>>>> question arises: *should automatically discard obsolete splits, >>>>>>>> or explicitly notify the user via an exception?* >>>>>>>> >>>>>>>> We provided a option `scan.partition-unsubscribe.strategy`: >>>>>>>> 1. If Strict, throws an exception when encountering removed splits. >>>>>>>> 2. If Lenient, automatically removes obsolete splits silently. >>>>>>>> >>>>>>>> What Do you think? >>>>>>>> >>>>>>>> Best, >>>>>>>> Hongshun >>>>>>>> >>>>>>>> On Thu, Oct 9, 2025 at 9:37 PM Leonard Xu <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Thanks hongshun for the updating and pretty detailed analysis for >>>>>>>>> edge cases, the updated FLIP looks good to me now. >>>>>>>>> >>>>>>>>> Only last implementation details about scenario in motivation >>>>>>>>> section: >>>>>>>>> >>>>>>>>> *Restart with Changed subscription: During restart, if source >>>>>>>>> options remove a topic or table. The splits which have already >>>>>>>>> assigned can >>>>>>>>> not be removed.* >>>>>>>>> >>>>>>>>> Could you clarify how we resolve this in Kafka connector ? >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Leonard >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> 2025 10月 9 19:48,Hongshun Wang <[email protected]> 写道: >>>>>>>>> >>>>>>>>> Hi devs, >>>>>>>>> If there are no further suggestions, I will start the voting >>>>>>>>> tomorrow。 >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Hongshun >>>>>>>>> >>>>>>>>> On Fri, Sep 26, 2025 at 7:48 PM Hongshun Wang < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Hi Becket and Leonard, >>>>>>>>>> >>>>>>>>>> I have updated the content of this FLIP. The key point is that: >>>>>>>>>> >>>>>>>>>> When the split enumerator receives a split, *these splits must >>>>>>>>>> have already existed in pendingSplitAssignment or assignedSplitments* >>>>>>>>>> . >>>>>>>>>> >>>>>>>>>> - If the split is in pendingSplitAssignments, ignore it. >>>>>>>>>> - If the split is in assignedSplitAssignments but has a >>>>>>>>>> different taskId, ignore it (this indicates it was already >>>>>>>>>> assigned to another task). >>>>>>>>>> - If the split is in assignedSplitAssignments and shares the >>>>>>>>>> same taskId, move the assignment from assignedSplitments to >>>>>>>>>> pendingSplitAssignment >>>>>>>>>> to re-assign again. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> For better understanding why use these strategies. I added some >>>>>>>>>> examples and pictures to show it. >>>>>>>>>> >>>>>>>>>> Would you like to help me check whether there are still some >>>>>>>>>> problems? >>>>>>>>>> >>>>>>>>>> Best >>>>>>>>>> Hongshun >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Sep 26, 2025 at 5:08 PM Leonard Xu <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks Becket and Hongshun for the insightful discussion. >>>>>>>>>>> >>>>>>>>>>> The underlying implementation and communication mechanisms of >>>>>>>>>>> Flink Source indeed involve many intricate details, we discussed >>>>>>>>>>> the issue >>>>>>>>>>> of splits re-assignment in specific scenarios, but fortunately, the >>>>>>>>>>> final >>>>>>>>>>> decision turned out to be pretty clear. >>>>>>>>>>> >>>>>>>>>>> +1 to Becket’s proposal to keeps the framework cleaner and more >>>>>>>>>>> flexible. >>>>>>>>>>> +1 to Hongshun’s point to provide comprehensive guidance for >>>>>>>>>>> connector developers. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Leonard >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 2025 9月 26 16:30,Hongshun Wang <[email protected]> 写道: >>>>>>>>>>> >>>>>>>>>>> Hi Becket, >>>>>>>>>>> >>>>>>>>>>> I Got it. You’re suggesting we should not handle this in the >>>>>>>>>>> source framework but instead let the split enumerator manage these >>>>>>>>>>> three >>>>>>>>>>> scenarios. >>>>>>>>>>> >>>>>>>>>>> Let me explain why I originally favored handling it in the >>>>>>>>>>> framework: I'm concerned that connector developers might overlook >>>>>>>>>>> certain >>>>>>>>>>> edge cases (after all, we even payed extensive discussions to fully >>>>>>>>>>> clarify >>>>>>>>>>> the logic) >>>>>>>>>>> >>>>>>>>>>> However, your point keeps the framework cleaner and more >>>>>>>>>>> flexible. Thus, I will take it. >>>>>>>>>>> >>>>>>>>>>> Perhaps, in this FLIP, we should focus on providing >>>>>>>>>>> comprehensive guidance for connector developers: explain how to >>>>>>>>>>> implement a split enumerator, including the underlying challenges >>>>>>>>>>> and their >>>>>>>>>>> solutions. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Additionally, we can use the Kafka connector as a reference >>>>>>>>>>> implementation to demonstrate the practical steps. This way, >>>>>>>>>>> developers who >>>>>>>>>>> want to implement similar connectors can directly reference this >>>>>>>>>>> example. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Hongshun >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Fri, Sep 26, 2025 at 1:27 PM Becket Qin <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> It would be good to not expose runtime details to the source >>>>>>>>>>>> implementation if possible. >>>>>>>>>>>> >>>>>>>>>>>> Today, the split enumerator implementations are expected to >>>>>>>>>>>> track the split assignment. >>>>>>>>>>>> >>>>>>>>>>>> Assuming the split enumerator implementation keeps a split >>>>>>>>>>>> assignment map, that means the enumerator should already know >>>>>>>>>>>> whether a >>>>>>>>>>>> split is assigned or unassigned. So it can handle the three >>>>>>>>>>>> scenarios you >>>>>>>>>>>> mentioned. >>>>>>>>>>>> >>>>>>>>>>>> The split is reported by a reader during a global restoration. >>>>>>>>>>>>> >>>>>>>>>>>> The split enumerator should have just been restored / created. >>>>>>>>>>>> If the enumerator expects a full reassignment of splits up on >>>>>>>>>>>> global >>>>>>>>>>>> recovery, there should be no assigned splits to that reader in the >>>>>>>>>>>> split >>>>>>>>>>>> assignment mapping. >>>>>>>>>>>> >>>>>>>>>>>> The split is reported by a reader during a partial failure >>>>>>>>>>>>> recovery. >>>>>>>>>>>>> >>>>>>>>>>>> In this case, when SplitEnumerator.addReader() is invoked, the >>>>>>>>>>>> split assignment map in the enumerator implementation should >>>>>>>>>>>> already have >>>>>>>>>>>> some split assignments for the reader. Therefore it is a partial >>>>>>>>>>>> failover. >>>>>>>>>>>> If the source supports split reassignment on recovery, the >>>>>>>>>>>> enumerator can >>>>>>>>>>>> assign splits that are different from the reported assignment of >>>>>>>>>>>> that >>>>>>>>>>>> reader in the SplitEnumeratorContext, or it can also assign the >>>>>>>>>>>> same >>>>>>>>>>>> splits. In any case, the enumerator knows that this is a partial >>>>>>>>>>>> recovery >>>>>>>>>>>> because the assignment map is non-empty. >>>>>>>>>>>> >>>>>>>>>>>> The split is not reported by a reader, but is assigned after >>>>>>>>>>>>> the last successful checkpoint and was never acknowledged. >>>>>>>>>>>> >>>>>>>>>>>> This is actually one of the step in the partial failure >>>>>>>>>>>> recover. SplitEnumerator.addSplitsBack() will be called first >>>>>>>>>>>> before >>>>>>>>>>>> SplitReader.addReader() is called for the recovered reader. When >>>>>>>>>>>> the >>>>>>>>>>>> SplitEnumerator.addSplitsBack() is invoked, it is for sure a >>>>>>>>>>>> partial >>>>>>>>>>>> recovery. And the enumerator should remove these splits from the >>>>>>>>>>>> split >>>>>>>>>>>> assignment map as if they were never assigned. >>>>>>>>>>>> >>>>>>>>>>>> I think this should work, right? >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> >>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>>> >>>>>>>>>>>> On Thu, Sep 25, 2025 at 8:34 PM Hongshun Wang < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Becket and Leonard, >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks for your advice. >>>>>>>>>>>>> >>>>>>>>>>>>> > put all the reader information in the SplitEnumerator context >>>>>>>>>>>>> I have a concern: the current registeredReaders in* >>>>>>>>>>>>> SourceCoordinatorContext will be removed after subtaskResetor >>>>>>>>>>>>> execution on >>>>>>>>>>>>> failure*.However, this approach has merit. >>>>>>>>>>>>> >>>>>>>>>>>>> One more situation I found my previous design does not cover: >>>>>>>>>>>>> >>>>>>>>>>>>> 1. Initial state: Reader A reports splits (1, 2). >>>>>>>>>>>>> 2. Enumerator action: Assigns split 1 to Reader A, and >>>>>>>>>>>>> split 2 to Reader B. >>>>>>>>>>>>> 3. Failure scenario: Reader A fails before checkpointing. >>>>>>>>>>>>> Since this is a partial failure, only Reader A restarts. >>>>>>>>>>>>> 4. Recovery issue: Upon recovery, Reader A re-reports >>>>>>>>>>>>> split (1). >>>>>>>>>>>>> >>>>>>>>>>>>> In my previous design, the enumerator will ignore Reader A's >>>>>>>>>>>>> re-registration which will cause data loss. >>>>>>>>>>>>> >>>>>>>>>>>>> Thus, when the enumerator receives a split, the split may >>>>>>>>>>>>> originate from three scenarios: >>>>>>>>>>>>> >>>>>>>>>>>>> 1. The split is reported by a reader during a global >>>>>>>>>>>>> restoration. >>>>>>>>>>>>> 2. The split is reported by a reader during a partial >>>>>>>>>>>>> failure recovery. >>>>>>>>>>>>> 3. The split is not reported by a reader, but is assigned >>>>>>>>>>>>> after the last successful checkpoint and was never >>>>>>>>>>>>> acknowledged. >>>>>>>>>>>>> >>>>>>>>>>>>> In the first scenario (global restore), the split should >>>>>>>>>>>>> be re-distributed. For the latter two scenarios (partial failover >>>>>>>>>>>>> and >>>>>>>>>>>>> post-checkpoint assignment), we need to reassign the split to >>>>>>>>>>>>> its originally assigned subtask. >>>>>>>>>>>>> >>>>>>>>>>>>> By implementing a method in the SplitEnumerator context to >>>>>>>>>>>>> track each assigned split's status, the system can correctly >>>>>>>>>>>>> identify and >>>>>>>>>>>>> resolve split ownership in all three scenarios.*What about >>>>>>>>>>>>> adding a `SplitRecoveryType splitRecoveryType(Split split)` in >>>>>>>>>>>>> SplitEnumeratorContext.* SplitRecoveryTypeis a enum including >>>>>>>>>>>>> `UNASSIGNED`、`GLOBAL_RESTORE`、`PARTIAL_FAILOVER` and >>>>>>>>>>>>> `POST_CHECKPOINT_ASSIGNMENT`. >>>>>>>>>>>>> >>>>>>>>>>>>> What do you think? Are there any details or scenarios I >>>>>>>>>>>>> haven't considered? Looking forward to your advice. >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> Hongshun >>>>>>>>>>>>> >>>>>>>>>>>>> On Thu, Sep 11, 2025 at 12:41 AM Becket Qin < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks for the explanation, Hongshun. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Current pattern of handling new reader registration following: >>>>>>>>>>>>>> 1. put all the reader information in the SplitEnumerator >>>>>>>>>>>>>> context >>>>>>>>>>>>>> 2. notify the enumerator about the new reader registration. >>>>>>>>>>>>>> 3. Let the split enumerator get whatever information it wants >>>>>>>>>>>>>> from the >>>>>>>>>>>>>> context and do its job. >>>>>>>>>>>>>> This pattern decouples the information passing and the reader >>>>>>>>>>>>>> registration >>>>>>>>>>>>>> notification. This makes the API extensible - we can add more >>>>>>>>>>>>>> information >>>>>>>>>>>>>> (e.g. reported assigned splits in our case) about the reader >>>>>>>>>>>>>> to the context >>>>>>>>>>>>>> without introducing new methods. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Introducing a new method of addSplitBackOnRecovery() is >>>>>>>>>>>>>> redundant to the >>>>>>>>>>>>>> above pattern. Do we really need it? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Jiangjie (Becket) Qin >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Sep 8, 2025 at 8:18 PM Hongshun Wang < >>>>>>>>>>>>>> [email protected]> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>> > Hi Becket, >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > > I am curious what would the enumerator do differently for >>>>>>>>>>>>>> the splits >>>>>>>>>>>>>> > added via addSplitsBackOnRecovery() V.S. addSplitsBack()? >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > In this FLIP, there are two distinct scenarios in which >>>>>>>>>>>>>> the enumerator >>>>>>>>>>>>>> > receives splits being added back: >>>>>>>>>>>>>> > 1. Job-level restore: The job is restored, splits from >>>>>>>>>>>>>> reader’s state are >>>>>>>>>>>>>> > reported by ReaderRegistrationEvent. >>>>>>>>>>>>>> > 2. Reader-level restart: a reader is started but not the >>>>>>>>>>>>>> whole job, >>>>>>>>>>>>>> > splits assigned to it after the last successful >>>>>>>>>>>>>> checkpoint. This is what >>>>>>>>>>>>>> > addSplitsBack used to do. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > In these two situations, the enumerator will choose >>>>>>>>>>>>>> different strategies. >>>>>>>>>>>>>> > 1. Job-level restore: the splits should be redistributed >>>>>>>>>>>>>> across readers >>>>>>>>>>>>>> > according to the current partitioner strategy. >>>>>>>>>>>>>> > 2. Reader-level restart: the splits should be reassigned >>>>>>>>>>>>>> directly back to >>>>>>>>>>>>>> > the same reader they were originally assigned to, >>>>>>>>>>>>>> preserving locality and >>>>>>>>>>>>>> > avoiding unnecessary redistribution >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Therefore, the enumerator must clearly distinguish between >>>>>>>>>>>>>> these two >>>>>>>>>>>>>> > scenarios.I used to deprecate the former >>>>>>>>>>>>>> addSplitsBack(List<SplitT> >>>>>>>>>>>>>> > splits, int subtaskId) but add a new >>>>>>>>>>>>>> addSplitsBack(List<SplitT> >>>>>>>>>>>>>> > splits, int subtaskId, >>>>>>>>>>>>>> > boolean reportedByReader). >>>>>>>>>>>>>> > Leonard suggest to use another method >>>>>>>>>>>>>> addSplitsBackOnRecovery but not >>>>>>>>>>>>>> > influenced currently addSplitsBack. >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > Best >>>>>>>>>>>>>> > Hongshun >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > On 2025/09/08 17:20:31 Becket Qin wrote: >>>>>>>>>>>>>> > > Hi Leonard, >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > > Could we introduce a new method like >>>>>>>>>>>>>> addSplitsBackOnRecovery with >>>>>>>>>>>>>> > default >>>>>>>>>>>>>> > > > implementation. In this way, we can provide better >>>>>>>>>>>>>> backward >>>>>>>>>>>>>> > compatibility >>>>>>>>>>>>>> > > > and also makes it easier for developers to understand. >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > I am curious what would the enumerator do differently for >>>>>>>>>>>>>> the splits >>>>>>>>>>>>>> > added >>>>>>>>>>>>>> > > via addSplitsBackOnRecovery() V.S. addSplitsBack()? >>>>>>>>>>>>>> Today, >>>>>>>>>>>>>> > addSplitsBack() >>>>>>>>>>>>>> > > is also only called upon recovery. So the new method >>>>>>>>>>>>>> seems confusing. One >>>>>>>>>>>>>> > > thing worth clarifying is if the Source implements >>>>>>>>>>>>>> > > SupportSplitReassignmentOnRecovery, upon recovery, should >>>>>>>>>>>>>> the splits >>>>>>>>>>>>>> > > reported by the readers also be added back to the >>>>>>>>>>>>>> SplitEnumerator via the >>>>>>>>>>>>>> > > addSplitsBack() call? Or should the SplitEnumerator >>>>>>>>>>>>>> explicitly query the >>>>>>>>>>>>>> > > registered reader information via the >>>>>>>>>>>>>> SplitEnumeratorContext to get the >>>>>>>>>>>>>> > > originally assigned splits when addReader() is invoked? I >>>>>>>>>>>>>> was assuming >>>>>>>>>>>>>> > the >>>>>>>>>>>>>> > > latter in the beginning, so the behavior of >>>>>>>>>>>>>> addSplitsBack() remains >>>>>>>>>>>>>> > > unchanged, but I am not opposed in doing the former. >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > Also, can you elaborate on the backwards compatibility >>>>>>>>>>>>>> issue you see if >>>>>>>>>>>>>> > we >>>>>>>>>>>>>> > > do not have a separate addSplitsBackOnRecovery() method? >>>>>>>>>>>>>> Even without >>>>>>>>>>>>>> > this >>>>>>>>>>>>>> > > new method, the behavior remains exactly the same unless >>>>>>>>>>>>>> the end users >>>>>>>>>>>>>> > > implement the mix-in interface of >>>>>>>>>>>>>> "SupportSplitReassignmentOnRecovery", >>>>>>>>>>>>>> > > right? >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > Thanks, >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > Jiangjie (Becket) Qin >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > On Mon, Sep 8, 2025 at 1:48 AM Hongshun Wang < >>>>>>>>>>>>>> [email protected]> >>>>>>>>>>>>>> > > wrote: >>>>>>>>>>>>>> > > >>>>>>>>>>>>>> > > > Hi devs, >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > It has been quite some time since this FLIP[1] was >>>>>>>>>>>>>> first proposed. >>>>>>>>>>>>>> > Thank >>>>>>>>>>>>>> > > > you for your valuable feedback—based on your >>>>>>>>>>>>>> suggestions, the FLIP has >>>>>>>>>>>>>> > > > undergone several rounds of revisions. >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > Any more advice is welcome and appreciated. If there >>>>>>>>>>>>>> are no further >>>>>>>>>>>>>> > > > concerns, I plan to start the vote tomorrow. >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > Best >>>>>>>>>>>>>> > > > Hongshun >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > [1] >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480 >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > On Mon, Sep 8, 2025 at 4:42 PM Hongshun Wang < >>>>>>>>>>>>>> [email protected]> >>>>>>>>>>>>>> > > > wrote: >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > > > > Hi Leonard, >>>>>>>>>>>>>> > > > > Thanks for your advice. It makes sense and I have >>>>>>>>>>>>>> modified it. >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > Best, >>>>>>>>>>>>>> > > > > Hongshun >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > > On Mon, Sep 8, 2025 at 11:40 AM Leonard Xu < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > >> Thanks Hongshun and Becket for the deep discussion. >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> > > > >> I only have one comment for one API design: >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> > > > >> > Deprecate the old addSplitsBack method, use a >>>>>>>>>>>>>> addSplitsBack with >>>>>>>>>>>>>> > > > >> param isReportedByReader instead. Because, The >>>>>>>>>>>>>> enumerator can apply >>>>>>>>>>>>>> > > > >> different reassignment policies based on the context. >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> > > > >> Could we introduce a new method like >>>>>>>>>>>>>> *addSplitsBackOnRecovery* with >>>>>>>>>>>>>> > > > default >>>>>>>>>>>>>> > > > >> implementation. In this way, we can provide better >>>>>>>>>>>>>> backward >>>>>>>>>>>>>> > > > >> compatibility and also makes it easier for >>>>>>>>>>>>>> developers to understand. >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> > > > >> Best, >>>>>>>>>>>>>> > > > >> Leonard >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> > > > >> 2025 9月 3 20:26,Hongshun Wang <[email protected]> 写道: >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> > > > >> Hi Becket, >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> > > > >> I think that's a great idea! I have added the >>>>>>>>>>>>>> > > > >> SupportSplitReassignmentOnRecovery interface in this >>>>>>>>>>>>>> FLIP. If a >>>>>>>>>>>>>> > Source >>>>>>>>>>>>>> > > > >> implements this interface indicates that the source >>>>>>>>>>>>>> operator needs >>>>>>>>>>>>>> > to >>>>>>>>>>>>>> > > > >> report splits to the enumerator and receive >>>>>>>>>>>>>> reassignment.[1] >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> > > > >> Best, >>>>>>>>>>>>>> > > > >> Hongshun >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> > > > >> [1] >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> > > > >> On Thu, Aug 21, 2025 at 12:09 PM Becket Qin < >>>>>>>>>>>>>> [email protected]> >>>>>>>>>>>>>> > > > wrote: >>>>>>>>>>>>>> > > > >> >>>>>>>>>>>>>> > > > >>> Hi Hongshun, >>>>>>>>>>>>>> > > > >>> >>>>>>>>>>>>>> > > > >>> I think the convention for such optional features >>>>>>>>>>>>>> in Source is via >>>>>>>>>>>>>> > > > >>> mix-in interfaces. So instead of adding a method to >>>>>>>>>>>>>> the >>>>>>>>>>>>>> > SourceReader, >>>>>>>>>>>>>> > > > maybe >>>>>>>>>>>>>> > > > >>> we should introduce an interface >>>>>>>>>>>>>> SupportSplitReassingmentOnRecovery >>>>>>>>>>>>>> > > > with >>>>>>>>>>>>>> > > > >>> this method. If a Source implementation implements >>>>>>>>>>>>>> that interface, >>>>>>>>>>>>>> > > > then the >>>>>>>>>>>>>> > > > >>> SourceOperator will check the desired behavior and >>>>>>>>>>>>>> act accordingly. >>>>>>>>>>>>>> > > > >>> >>>>>>>>>>>>>> > > > >>> Thanks, >>>>>>>>>>>>>> > > > >>> >>>>>>>>>>>>>> > > > >>> Jiangjie (Becket) Qin >>>>>>>>>>>>>> > > > >>> >>>>>>>>>>>>>> > > > >>> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang < >>>>>>>>>>>>>> > [email protected] >>>>>>>>>>>>>> > > > > >>>>>>>>>>>>>> > > > >>> wrote: >>>>>>>>>>>>>> > > > >>> >>>>>>>>>>>>>> > > > >>>> Hi de vs, >>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>> > > > >>>> Would anyone like to discuss this FLIP? I'd >>>>>>>>>>>>>> appreciate your >>>>>>>>>>>>>> > feedback >>>>>>>>>>>>>> > > > >>>> and suggestions. >>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>> > > > >>>> Best, >>>>>>>>>>>>>> > > > >>>> Hongshun >>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>> > > > >>>> 2025年8月13日 14:23,Hongshun Wang <[email protected]> >>>>>>>>>>>>>> 写道: >>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>> > > > >>>> Hi Becket, >>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>> > > > >>>> Thank you for your detailed feedback. The new >>>>>>>>>>>>>> contract makes good >>>>>>>>>>>>>> > > > sense >>>>>>>>>>>>>> > > > >>>> to me and effectively addresses the issues I >>>>>>>>>>>>>> encountered at the >>>>>>>>>>>>>> > > > beginning >>>>>>>>>>>>>> > > > >>>> of the design. >>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>> > > > >>>> That said, I recommend not reporting splits by >>>>>>>>>>>>>> default, primarily >>>>>>>>>>>>>> > for >>>>>>>>>>>>>> > > > >>>> compatibility and practical reasons: >>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>> > > > >>>> > For these reasons, we do not expect the Split >>>>>>>>>>>>>> objects to be >>>>>>>>>>>>>> > huge, >>>>>>>>>>>>>> > > > >>>> and we are not trying to design for huge Split >>>>>>>>>>>>>> objects either as >>>>>>>>>>>>>> > they >>>>>>>>>>>>>> > > > will >>>>>>>>>>>>>> > > > >>>> have problems even today. >>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>> > > > >>>> 1. >>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>> > > > >>>> Not all existing connector match this rule >>>>>>>>>>>>>> > > > >>>> For example, in mysql cdc connector, a binlog >>>>>>>>>>>>>> split may contain >>>>>>>>>>>>>> > > > >>>> hundreds (or even more) snapshot split >>>>>>>>>>>>>> completion records. This >>>>>>>>>>>>>> > > > state is >>>>>>>>>>>>>> > > > >>>> large and is currently transmitted >>>>>>>>>>>>>> incrementally through >>>>>>>>>>>>>> > multiple >>>>>>>>>>>>>> > > > >>>> BinlogSplitMetaEvent messages. Since the binlog >>>>>>>>>>>>>> reader operates >>>>>>>>>>>>>> > > > >>>> with single parallelism, reporting the full >>>>>>>>>>>>>> split state on >>>>>>>>>>>>>> > recovery >>>>>>>>>>>>>> > > > >>>> could be inefficient or even infeasible. >>>>>>>>>>>>>> > > > >>>> For such sources, it would be better to provide >>>>>>>>>>>>>> a mechanism to >>>>>>>>>>>>>> > skip >>>>>>>>>>>>>> > > > >>>> split reporting during restart until they >>>>>>>>>>>>>> redesign and reduce >>>>>>>>>>>>>> > the >>>>>>>>>>>>>> > > > >>>> split size. >>>>>>>>>>>>>> > > > >>>> 2. >>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>> > > > >>>> Not all enumerators maintain unassigned splits >>>>>>>>>>>>>> in state. >>>>>>>>>>>>>> > > > >>>> Some SplitEnumerator(such as kafka connector) >>>>>>>>>>>>>> implementations >>>>>>>>>>>>>> > do >>>>>>>>>>>>>> > > > >>>> not track or persistently manage unassigned >>>>>>>>>>>>>> splits. Requiring >>>>>>>>>>>>>> > them >>>>>>>>>>>>>> > > > to >>>>>>>>>>>>>> > > > >>>> handle re-registration would add unnecessary >>>>>>>>>>>>>> complexity. Even >>>>>>>>>>>>>> > > > though we >>>>>>>>>>>>>> > > > >>>> maybe implements in kafka connector, currently, >>>>>>>>>>>>>> kafka connector >>>>>>>>>>>>>> > is >>>>>>>>>>>>>> > > > decouple >>>>>>>>>>>>>> > > > >>>> with flink version, we also need to make sure >>>>>>>>>>>>>> the elder version >>>>>>>>>>>>>> > is >>>>>>>>>>>>>> > > > >>>> compatible. >>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>> > > > >>>> ------------------------------ >>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>> > > > >>>> To address these concerns, I propose introducing a >>>>>>>>>>>>>> new method: >>>>>>>>>>>>>> > boolean >>>>>>>>>>>>>> > > > >>>> SourceReader#shouldReassignSplitsOnRecovery() with >>>>>>>>>>>>>> a default >>>>>>>>>>>>>> > > > >>>> implementation returning false. This allows source >>>>>>>>>>>>>> readers to opt >>>>>>>>>>>>>> > in >>>>>>>>>>>>>> > > > >>>> to split reassignment only when necessary. Since >>>>>>>>>>>>>> the new contract >>>>>>>>>>>>>> > > > already >>>>>>>>>>>>>> > > > >>>> places the responsibility for split assignment on >>>>>>>>>>>>>> the enumerator, >>>>>>>>>>>>>> > not >>>>>>>>>>>>>> > > > >>>> reporting splits by default is a safe and clean >>>>>>>>>>>>>> default behavior. >>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>> > > > >>>> ------------------------------ >>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>> > > > >>>> I’ve updated the implementation and the FIP >>>>>>>>>>>>>> accordingly[1]. It >>>>>>>>>>>>>> > quite a >>>>>>>>>>>>>> > > > >>>> big change. In particular, for the Kafka >>>>>>>>>>>>>> connector, we can now use >>>>>>>>>>>>>> > a >>>>>>>>>>>>>> > > > >>>> pluggable SplitPartitioner to support different >>>>>>>>>>>>>> split assignment >>>>>>>>>>>>>> > > > >>>> strategies (e.g., default, round-robin). >>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>> > > > >>>> Could you please review it when you have a chance? >>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>> > > > >>>> Best, >>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>> > > > >>>> Hongshun >>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>> > > > >>>> [1] >>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>> > > > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> > >>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment >>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>> > > > >>>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin < >>>>>>>>>>>>>> [email protected]> >>>>>>>>>>>>>> > > > wrote: >>>>>>>>>>>>>> > > > >>>> >>>>>>>>>>>>>> > > > >>>>> Hi Hongshun, >>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>> > > > >>>>> I am not too concerned about the transmission >>>>>>>>>>>>>> cost. Because the >>>>>>>>>>>>>> > full >>>>>>>>>>>>>> > > > >>>>> split transmission has to happen in the initial >>>>>>>>>>>>>> assignment phase >>>>>>>>>>>>>> > > > already. >>>>>>>>>>>>>> > > > >>>>> And in the future, we probably want to also >>>>>>>>>>>>>> introduce some kind >>>>>>>>>>>>>> > of >>>>>>>>>>>>>> > > > workload >>>>>>>>>>>>>> > > > >>>>> balance across source readers, e.g. based on the >>>>>>>>>>>>>> per-split >>>>>>>>>>>>>> > > > throughput or >>>>>>>>>>>>>> > > > >>>>> the per-source-reader workload in heterogeneous >>>>>>>>>>>>>> clusters. For >>>>>>>>>>>>>> > these >>>>>>>>>>>>>> > > > >>>>> reasons, we do not expect the Split objects to be >>>>>>>>>>>>>> huge, and we >>>>>>>>>>>>>> > are >>>>>>>>>>>>>> > > > not >>>>>>>>>>>>>> > > > >>>>> trying to design for huge Split objects either as >>>>>>>>>>>>>> they will have >>>>>>>>>>>>>> > > > problems >>>>>>>>>>>>>> > > > >>>>> even today. >>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>> > > > >>>>> Good point on the potential split loss, please >>>>>>>>>>>>>> see the reply >>>>>>>>>>>>>> > below: >>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>> > > > >>>>> Scenario 2: >>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and Reader >>>>>>>>>>>>>> B reports (3 >>>>>>>>>>>>>> > and 4) >>>>>>>>>>>>>> > > > >>>>>> upon restart. >>>>>>>>>>>>>> > > > >>>>>> 2. Before the enumerator receives all reports >>>>>>>>>>>>>> and performs >>>>>>>>>>>>>> > > > >>>>>> reassignment, a checkpoint is triggered. >>>>>>>>>>>>>> > > > >>>>>> 3. Since no splits have been reassigned yet, >>>>>>>>>>>>>> both readers have >>>>>>>>>>>>>> > empty >>>>>>>>>>>>>> > > > >>>>>> states. >>>>>>>>>>>>>> > > > >>>>>> 4. When restarting from this checkpoint, all >>>>>>>>>>>>>> four splits are >>>>>>>>>>>>>> > lost. >>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>> > > > >>>>> The reader registration happens in the >>>>>>>>>>>>>> SourceOperator.open(), >>>>>>>>>>>>>> > which >>>>>>>>>>>>>> > > > >>>>> means the task is still in the initializing >>>>>>>>>>>>>> state, therefore the >>>>>>>>>>>>>> > > > checkpoint >>>>>>>>>>>>>> > > > >>>>> should not be triggered until the enumerator >>>>>>>>>>>>>> receives all the >>>>>>>>>>>>>> > split >>>>>>>>>>>>>> > > > reports. >>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>> > > > >>>>> There is a nuance here. Today, the RPC call from >>>>>>>>>>>>>> the TM to the JM >>>>>>>>>>>>>> > is >>>>>>>>>>>>>> > > > >>>>> async. So it is possible that the >>>>>>>>>>>>>> SourceOpertor.open() has >>>>>>>>>>>>>> > returned, >>>>>>>>>>>>>> > > > but >>>>>>>>>>>>>> > > > >>>>> the enumerator has not received the split >>>>>>>>>>>>>> reports. However, >>>>>>>>>>>>>> > because >>>>>>>>>>>>>> > > > the >>>>>>>>>>>>>> > > > >>>>> task status update RPC call goes to the same >>>>>>>>>>>>>> channel as the split >>>>>>>>>>>>>> > > > reports >>>>>>>>>>>>>> > > > >>>>> call, so the task status RPC call will happen >>>>>>>>>>>>>> after the split >>>>>>>>>>>>>> > > > reports call >>>>>>>>>>>>>> > > > >>>>> on the JM side. Therefore, on the JM side, the >>>>>>>>>>>>>> SourceCoordinator >>>>>>>>>>>>>> > will >>>>>>>>>>>>>> > > > >>>>> always first receive the split reports, then >>>>>>>>>>>>>> receive the >>>>>>>>>>>>>> > checkpoint >>>>>>>>>>>>>> > > > request. >>>>>>>>>>>>>> > > > >>>>> This "happen before" relationship is kind of >>>>>>>>>>>>>> important to >>>>>>>>>>>>>> > guarantee >>>>>>>>>>>>>> > > > >>>>> the consistent state between enumerator and >>>>>>>>>>>>>> readers. >>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>> > > > >>>>> Scenario 1: >>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned >>>>>>>>>>>>>> splits (1 and 2), and >>>>>>>>>>>>>> > > > >>>>>> Reader B reports (3 and 4). >>>>>>>>>>>>>> > > > >>>>>> 2. The enumerator receives these reports but >>>>>>>>>>>>>> only reassigns >>>>>>>>>>>>>> > splits 1 >>>>>>>>>>>>>> > > > >>>>>> and 2 — not 3 and 4. >>>>>>>>>>>>>> > > > >>>>>> 3. A checkpoint or savepoint is then triggered. >>>>>>>>>>>>>> Only splits 1 >>>>>>>>>>>>>> > and 2 >>>>>>>>>>>>>> > > > >>>>>> are recorded in the reader states; splits 3 and >>>>>>>>>>>>>> 4 are not >>>>>>>>>>>>>> > persisted. >>>>>>>>>>>>>> > > > >>>>>> 4. If the job is later restarted from this >>>>>>>>>>>>>> checkpoint, splits 3 >>>>>>>>>>>>>> > and >>>>>>>>>>>>>> > > > 4 >>>>>>>>>>>>>> > > > >>>>>> will be permanently lost. >>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>> > > > >>>>> This scenario is possible. One solution is to let >>>>>>>>>>>>>> the enumerator >>>>>>>>>>>>>> > > > >>>>> implementation handle this. That means if the >>>>>>>>>>>>>> enumerator relies >>>>>>>>>>>>>> > on >>>>>>>>>>>>>> > > > the >>>>>>>>>>>>>> > > > >>>>> initial split reports from the source readers, it >>>>>>>>>>>>>> should maintain >>>>>>>>>>>>>> > > > these >>>>>>>>>>>>>> > > > >>>>> reports by itself. In the above example, the >>>>>>>>>>>>>> enumerator will need >>>>>>>>>>>>>> > to >>>>>>>>>>>>>> > > > >>>>> remember that 3 and 4 are not assigned and put it >>>>>>>>>>>>>> into its own >>>>>>>>>>>>>> > state. >>>>>>>>>>>>>> > > > >>>>> The current contract is that anything assigned to >>>>>>>>>>>>>> the >>>>>>>>>>>>>> > SourceReaders >>>>>>>>>>>>>> > > > >>>>> are completely owned by the SourceReaders. >>>>>>>>>>>>>> Enumerators can >>>>>>>>>>>>>> > remember >>>>>>>>>>>>>> > > > the >>>>>>>>>>>>>> > > > >>>>> assignments but cannot change them, even when the >>>>>>>>>>>>>> source reader >>>>>>>>>>>>>> > > > recovers / >>>>>>>>>>>>>> > > > >>>>> restarts. >>>>>>>>>>>>>> > > > >>>>> With this FLIP, the contract becomes that the >>>>>>>>>>>>>> source readers will >>>>>>>>>>>>>> > > > >>>>> return the ownership of the splits to the >>>>>>>>>>>>>> enumerator. So the >>>>>>>>>>>>>> > > > enumerator is >>>>>>>>>>>>>> > > > >>>>> responsible for maintaining these splits, until >>>>>>>>>>>>>> they are assigned >>>>>>>>>>>>>> > to >>>>>>>>>>>>>> > > > a >>>>>>>>>>>>>> > > > >>>>> source reader again. >>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>> > > > >>>>> There are other cases where there may be conflict >>>>>>>>>>>>>> information >>>>>>>>>>>>>> > between >>>>>>>>>>>>>> > > > >>>>> reader and enumerator. For example, consider the >>>>>>>>>>>>>> following >>>>>>>>>>>>>> > sequence: >>>>>>>>>>>>>> > > > >>>>> 1. reader A reports splits (1 and 2) up on >>>>>>>>>>>>>> restart. >>>>>>>>>>>>>> > > > >>>>> 2. enumerator receives the report and assigns >>>>>>>>>>>>>> both 1 and 2 to >>>>>>>>>>>>>> > reader >>>>>>>>>>>>>> > > > B. >>>>>>>>>>>>>> > > > >>>>> 3. reader A failed before checkpointing. And this >>>>>>>>>>>>>> is a partial >>>>>>>>>>>>>> > > > >>>>> failure, so only reader A restarts. >>>>>>>>>>>>>> > > > >>>>> 4. When reader A recovers, it will again report >>>>>>>>>>>>>> splits (1 and 2) >>>>>>>>>>>>>> > to >>>>>>>>>>>>>> > > > >>>>> the enumerator. >>>>>>>>>>>>>> > > > >>>>> 5. The enumerator should ignore this report >>>>>>>>>>>>>> because it has >>>>>>>>>>>>>> > > > >>>>> assigned splits (1 and 2) to reader B. >>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>> > > > >>>>> So with the new contract, the enumerator should >>>>>>>>>>>>>> be the source of >>>>>>>>>>>>>> > > > truth >>>>>>>>>>>>>> > > > >>>>> for split ownership. >>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>> > > > >>>>> Thanks, >>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>> > > > >>>>> Jiangjie (Becket) Qin >>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>> > > > >>>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang < >>>>>>>>>>>>>> > > > [email protected]> >>>>>>>>>>>>>> > > > >>>>> wrote: >>>>>>>>>>>>>> > > > >>>>> >>>>>>>>>>>>>> > > > >>>>>> Hi Becket, >>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>> > > > >>>>>> I did consider this approach at the beginning >>>>>>>>>>>>>> (and it was also >>>>>>>>>>>>>> > > > >>>>>> mentioned in this FLIP), since it would allow >>>>>>>>>>>>>> more flexibility >>>>>>>>>>>>>> > in >>>>>>>>>>>>>> > > > >>>>>> reassigning all splits. However, there are a few >>>>>>>>>>>>>> potential >>>>>>>>>>>>>> > issues. >>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>> > > > >>>>>> 1. High Transmission Cost >>>>>>>>>>>>>> > > > >>>>>> If we pass the full split objects (rather than >>>>>>>>>>>>>> just split IDs), >>>>>>>>>>>>>> > the >>>>>>>>>>>>>> > > > >>>>>> data size could be significant, leading to high >>>>>>>>>>>>>> overhead during >>>>>>>>>>>>>> > > > >>>>>> transmission — especially when many splits are >>>>>>>>>>>>>> involved. >>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>> > > > >>>>>> 2. Risk of Split Loss >>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>> > > > >>>>>> Risk of split loss exists unless we have a >>>>>>>>>>>>>> mechanism to make >>>>>>>>>>>>>> > sure >>>>>>>>>>>>>> > > > >>>>>> only can checkpoint after all the splits are >>>>>>>>>>>>>> reassigned. >>>>>>>>>>>>>> > > > >>>>>> There are scenarios where splits could be lost >>>>>>>>>>>>>> due to >>>>>>>>>>>>>> > inconsistent >>>>>>>>>>>>>> > > > >>>>>> state handling during recovery: >>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>> > > > >>>>>> Scenario 1: >>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned >>>>>>>>>>>>>> splits (1 and 2), >>>>>>>>>>>>>> > and >>>>>>>>>>>>>> > > > >>>>>> Reader B reports (3 and 4). >>>>>>>>>>>>>> > > > >>>>>> 2. The enumerator receives these reports but >>>>>>>>>>>>>> only reassigns >>>>>>>>>>>>>> > > > >>>>>> splits 1 and 2 — not 3 and 4. >>>>>>>>>>>>>> > > > >>>>>> 3. A checkpoint or savepoint is then >>>>>>>>>>>>>> triggered. Only splits 1 >>>>>>>>>>>>>> > and >>>>>>>>>>>>>> > > > >>>>>> 2 are recorded in the reader states; splits 3 >>>>>>>>>>>>>> and 4 are not >>>>>>>>>>>>>> > > > persisted. >>>>>>>>>>>>>> > > > >>>>>> 4. If the job is later restarted from this >>>>>>>>>>>>>> checkpoint, splits >>>>>>>>>>>>>> > 3 >>>>>>>>>>>>>> > > > >>>>>> and 4 will be permanently lost. >>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>> > > > >>>>>> Scenario 2: >>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and >>>>>>>>>>>>>> Reader B reports (3 >>>>>>>>>>>>>> > and >>>>>>>>>>>>>> > > > >>>>>> 4) upon restart. >>>>>>>>>>>>>> > > > >>>>>> 2. Before the enumerator receives all reports >>>>>>>>>>>>>> and performs >>>>>>>>>>>>>> > > > >>>>>> reassignment, a checkpoint is triggered. >>>>>>>>>>>>>> > > > >>>>>> 3. Since no splits have been reassigned yet, >>>>>>>>>>>>>> both readers >>>>>>>>>>>>>> > have >>>>>>>>>>>>>> > > > >>>>>> empty states. >>>>>>>>>>>>>> > > > >>>>>> 4. When restarting from this checkpoint, all >>>>>>>>>>>>>> four splits are >>>>>>>>>>>>>> > > > lost. >>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>> > > > >>>>>> Let me know if you have thoughts on how we might >>>>>>>>>>>>>> mitigate these >>>>>>>>>>>>>> > > > risks! >>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>> > > > >>>>>> Best >>>>>>>>>>>>>> > > > >>>>>> Hongshun >>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>> > > > >>>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin < >>>>>>>>>>>>>> [email protected]> >>>>>>>>>>>>>> > > > >>>>>> wrote: >>>>>>>>>>>>>> > > > >>>>>> >>>>>>>>>>>>>> > > > >>>>>>> Hi Hongshun, >>>>>>>>>>>>>> > > > >>>>>>> >>>>>>>>>>>>>> > > > >>>>>>> The steps sound reasonable to me in general. In >>>>>>>>>>>>>> terms of the >>>>>>>>>>>>>> > > > updated >>>>>>>>>>>>>> > > > >>>>>>> FLIP wiki, it would be good to see if we can >>>>>>>>>>>>>> keep the protocol >>>>>>>>>>>>>> > > > simple. One >>>>>>>>>>>>>> > > > >>>>>>> alternative way to achieve this behavior is >>>>>>>>>>>>>> following: >>>>>>>>>>>>>> > > > >>>>>>> >>>>>>>>>>>>>> > > > >>>>>>> 1. Upon SourceOperator startup, the >>>>>>>>>>>>>> SourceOperator sends >>>>>>>>>>>>>> > > > >>>>>>> ReaderRegistrationEvent with the currently >>>>>>>>>>>>>> assigned splits to >>>>>>>>>>>>>> > the >>>>>>>>>>>>>> > > > >>>>>>> enumerator. It does not add these splits to the >>>>>>>>>>>>>> SourceReader. >>>>>>>>>>>>>> > > > >>>>>>> 2. The enumerator will always use the >>>>>>>>>>>>>> > > > >>>>>>> SourceEnumeratorContext.assignSplits() to >>>>>>>>>>>>>> assign the splits. >>>>>>>>>>>>>> > (not >>>>>>>>>>>>>> > > > via the >>>>>>>>>>>>>> > > > >>>>>>> response of the SourceRegistrationEvent, this >>>>>>>>>>>>>> allows async >>>>>>>>>>>>>> > split >>>>>>>>>>>>>> > > > assignment >>>>>>>>>>>>>> > > > >>>>>>> in case the enumerator wants to wait until all >>>>>>>>>>>>>> the readers are >>>>>>>>>>>>>> > > > registered) >>>>>>>>>>>>>> > > > >>>>>>> 3. The SourceOperator will only call >>>>>>>>>>>>>> SourceReader.addSplits() >>>>>>>>>>>>>> > when >>>>>>>>>>>>>> > > > >>>>>>> it receives the AddSplitEvent from the >>>>>>>>>>>>>> enumerator. >>>>>>>>>>>>>> > > > >>>>>>> >>>>>>>>>>>>>> > > > >>>>>>> This protocol has a few benefits: >>>>>>>>>>>>>> > > > >>>>>>> 1. it basically allows arbitrary split >>>>>>>>>>>>>> reassignment upon >>>>>>>>>>>>>> > restart >>>>>>>>>>>>>> > > > >>>>>>> 2. simplicity: there is only one way to assign >>>>>>>>>>>>>> splits. >>>>>>>>>>>>>> > > > >>>>>>> >>>>>>>>>>>>>> > > > >>>>>>> So we only need one interface change: >>>>>>>>>>>>>> > > > >>>>>>> - add the initially assigned splits to >>>>>>>>>>>>>> ReaderInfo so the >>>>>>>>>>>>>> > Enumerator >>>>>>>>>>>>>> > > > >>>>>>> can access it. >>>>>>>>>>>>>> > > > >>>>>>> and one behavior change: >>>>>>>>>>>>>> > > > >>>>>>> - The SourceOperator should stop assigning >>>>>>>>>>>>>> splits to the from >>>>>>>>>>>>>> > state >>>>>>>>>>>>>> > > > >>>>>>> restoration, but >>>>>>>>>>>>>> > [message truncated...] >>>>>>>>>>>>>> > >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>
