Hi devs,

If there is no other problem, I will start a vote later.

Best,
Hongshun

On Mon, Oct 13, 2025 at 4:17 PM Hongshun Wang <[email protected]>
wrote:

> Hi Becket and Leonard,
>
> It seems adding  `splitsOnRecovery` to `ReaderInfo` makes the split
> enumerator simpler and cleaner.
>
> I have modified this FLIP again. Please have a look and let me know what
> you think.
>
> Best,
> Hongshun
>
> On Mon, Oct 13, 2025 at 10:48 AM Hongshun Wang <[email protected]>
> wrote:
>
>> Hi Becket,
>> Thanks for your explanation.
>>
>> > For the same three input above, the assignment should be consistently
>> the same.
>>
>> That is exactly what troubles me. For *assignment algorithms such as
>> hash, it does behave the same. What If we use round-robin? Each *the *reader
>> information, the same split will be assigned to different readers. There is
>> also what I used to list as an example.*
>>
>>    1. *Initial state:*: 2 parallelism, 2 splits.
>>    2. *Enumerator action:*  Split 1 → Task 1, Split 2 → Task 2 ,  ,
>>    3. *Failure scenario: *After Split 2 is assigned to Task 2 but before
>>    next checkpoint success, task 1 restarts.
>>    4. *Recovery issue:* Split 2 is re-added to the enumerator.
>>    Round-robin strategy assigns Split 2 to Task 1. Then Task 1 now has 2
>>    splits, Task 2 has 0 → Imbalanced distribution.
>>
>>
>> > Please let me know if you think a meeting would be more efficient.
>> Yes, I’d like to reach an agreement as soon as possible. If you’re
>> available, we could schedule a meeting with Lenenord as well.
>>
>> Best,
>> Hongshun
>>
>> On Sat, Oct 11, 2025 at 3:59 PM Becket Qin <[email protected]> wrote:
>>
>>> Hi Hongshun,
>>>
>>> I am confused. First of all, regardless of what the assignment algorithm
>>> is. Using SplitEnumeratorContext to return the splits only gives more
>>> information than using addSplitsBack(). So there should be no regression.
>>>
>>> Secondly, at this point. The SplitEnumerator should only take the
>>> following three input to generate the global splits assignment:
>>> 1. the *reader information (num readers, locations, etc)*
>>> 2. *all the splits to assign*
>>> 3. *configured assignment algorithm *
>>> Preferably, for the same three input above, the assignment should be
>>> consistently the same. I don't see why it should care about why a new
>>> reader is added, whether due to partial failover or global failover or job
>>> restart.
>>>
>>> If you want to do global redistribution on global failover and restart,
>>> but honor the existing assignment for partial failover. The enumerator will
>>> just do the following:
>>> 1. Generate a new global assignment (global redistribution) in start()
>>> because start() will only be invoked in global failover or restart. That
>>> means all the readers are also new with empty assignment.
>>> 2. After the global assignment is generated, it should be honored for
>>> the whole life cycle. there might be many reader registrations, again for
>>> different reasons but does not matter:
>>>     - reader registration after this job restart
>>>     - reader registration after this global failover
>>>     - reader registration due to partial failover which may or may not
>>> have a addSplitsBack() call.
>>>     Regardless of the reason, the split enumerator will just enforce the
>>> global assignment it has already generated, i.e. without split
>>> redistribution.
>>>
>>> Wouldn't that give the behavior you want? I feel the discussion somehow
>>> goes to circles. Please let me know if you think a meeting would be more
>>> efficient.
>>>
>>> Thanks,
>>>
>>> Jiangjie (Becket) Qin
>>>
>>> On Fri, Oct 10, 2025 at 7:58 PM Hongshun Wang <[email protected]>
>>> wrote:
>>>
>>>> Hi Becket,
>>>>
>>>> > Ignore a returned split if it has been assigned to a different
>>>> reader, otherwise put it back to unassigned splits / pending splits. Then
>>>> the enumerator assigns new splits to the newly added reader, which may use
>>>> the previous assignment as a reference. This should work regardless of
>>>> whether it is a global failover, partial failover, restart, etc. There is
>>>> no need for the SplitEnumerator to distinguish what failover scenario it 
>>>> is.
>>>>
>>>> In this case, it seems that global failover and partial failover share
>>>> the same distribution strategy If it has not been assigned to a different
>>>> reader. However, global failover needs to be redistributed(this is why we
>>>> need this FLIP) , while partial failover is not. I have no idea how we
>>>> distinguish them.
>>>>
>>>> What do you think?
>>>>
>>>> Best,
>>>> Hongshun
>>>>
>>>> On Sat, Oct 11, 2025 at 12:54 AM Becket Qin <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Hongshun,
>>>>>
>>>>> The problem we are trying to solve here is to give the splits back to
>>>>> the SplitEnumerator. There are only two types of splits to give back:
>>>>> 1) splits whose assignment has been checkpointed. - In this case, we
>>>>> rely on addReader() + SplitEnumeratorContext to give the splits back, this
>>>>> provides more information associated with those splits.
>>>>> 2) splits whose assignment has not been checkpointed. -  In this case,
>>>>> we use addSplitsBack(), there is no reader info to give because the
>>>>> previous assignment did not take effect to begin with.
>>>>>
>>>>> From the SplitEnumerator implementation perspective, the contract is
>>>>> straightforward.
>>>>> 1. The SplitEnumerator is the source of truth for assignment.
>>>>> 2. When the enumerator receives the addSplits() call, it always add
>>>>> these splits back to unassigned splits / pending splits.
>>>>> 3. When the enumerator receives the addReader() call, that means the
>>>>> reader has no current assignment, and has returned its previous assignment
>>>>> based on the reader side info. The SplitEnumerator checks the
>>>>> SplitEnumeratorContext to retrieve the returned splits from that reader
>>>>> (i.e. previous assignment) and handle them according to its own source of
>>>>> truth knowledge of assignment - Ignore a returned split if it has been
>>>>> assigned to a different reader, otherwise put it back to unassigned splits
>>>>> / pending splits. Then the enumerator assigns new splits to the newly 
>>>>> added
>>>>> reader, which may use the previous assignment as a reference. This should
>>>>> work regardless of whether it is a global failover, partial failover,
>>>>> restart, etc. There is no need for the SplitEnumerator to distinguish what
>>>>> failover scenario it is.
>>>>>
>>>>> Would this work?
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Jiangjie (Becket) Qin
>>>>>
>>>>> On Fri, Oct 10, 2025 at 1:28 AM Hongshun Wang <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Becket,
>>>>>>  > why do we need to change the behavior of addSplitsBack()? Should
>>>>>> it remain the same?
>>>>>>
>>>>>> How does the enumerator get the splits from ReaderRegistrationEvent
>>>>>> and then reassign it?
>>>>>>
>>>>>> You have given a advice before:
>>>>>> > 1. Put all the reader information in the SplitEnumerator context.
>>>>>> 2. notify the enumerator about the new reader registration. 3. let the
>>>>>> split enumerator get whatever information it wants from the context and 
>>>>>> do
>>>>>> its job.
>>>>>>
>>>>>> However, each time a source task fails over, the
>>>>>> ConcurrentMap<Integer, ConcurrentMap<Integer, ReaderInfo>>
>>>>>> registeredReaders will remove this reader infos. When the source task is
>>>>>> registered again, it will be added again. *Thus, registeredReaders
>>>>>> cannot know whether is registered before. *
>>>>>>
>>>>>> Therefore, registeredReaders enumerator#addReader does not
>>>>>> distinguish the following situations:
>>>>>> However, each time one source task is failover. The
>>>>>> `ConcurrentMap<Integer, ConcurrentMap<Integer, ReaderInfo>>
>>>>>> registeredReaders` will remove this source. When source Task is 
>>>>>> registered
>>>>>> again, enumerator#addReader not distinguished three situations:
>>>>>> 1. The Reader is registered when the global restart. In this case,
>>>>>> redistribution the split from the infos. (take off all the splits from
>>>>>> ReaderInfo).
>>>>>> 2. The Reader is registered when a partial failover(before the first
>>>>>> successful checkpoint). In this case,  ignore the split from the infos.
>>>>>> (leave alone all the splits from ReaderInfo).
>>>>>> 3. The Reader is registered when a partial failover(after the first
>>>>>> successful checkpoint).In this case, we need assign the split to same
>>>>>> reader again. (take off all the splits from ReaderInfo but assigned to it
>>>>>> again).
>>>>>> we still need the enumerator to distinguish them (using
>>>>>> pendingSplitAssignment & assignedSplitAssignment. However, it is 
>>>>>> redundant
>>>>>> to maintain split assigned information both in the enumerator and the
>>>>>> enumerator context.
>>>>>>
>>>>>> I think if we change the behavior of addSplitsBack, it will be more
>>>>>> simple. Just let the enumerator to handle these split based on 
>>>>>> pendingSplitAssignment
>>>>>> & assignedSplitments.
>>>>>>
>>>>>> What do you think?
>>>>>>
>>>>>> Best,
>>>>>> Hongshun
>>>>>>
>>>>>> On Fri, Oct 10, 2025 at 12:55 PM Becket Qin <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Hongshun,
>>>>>>>
>>>>>>> Thanks for updating the FLIP. A quick question: why do we need to
>>>>>>> change the behavior of addSplitsBack()? Should it remain the same?
>>>>>>>
>>>>>>> Regarding the case of restart with changed subscription. I think the
>>>>>>> only correct behavior is removing obsolete splits without any warning /
>>>>>>> exception. It is OK to add an info level logging if we want to. It is a
>>>>>>> clear intention if the user has explicitly changed subscription and
>>>>>>> restarted the job. There is no need to add a config to double confirm.
>>>>>>>
>>>>>>> Regards,
>>>>>>>
>>>>>>> Jiangjie (Becket) Qin
>>>>>>>
>>>>>>> On Thu, Oct 9, 2025 at 7:28 PM Hongshun Wang <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Hi Leonard,
>>>>>>>>
>>>>>>>> If the SplitEnumerator received all splits after a restart, it
>>>>>>>> becomes straightforward to clear and un-assign the unmatched
>>>>>>>> splits(checking whether matches the source options). However, a key
>>>>>>>> question arises: *should  automatically discard obsolete splits,
>>>>>>>> or explicitly notify the user via an exception?*
>>>>>>>>
>>>>>>>> We provided a option `scan.partition-unsubscribe.strategy`:
>>>>>>>> 1. If Strict, throws an exception when encountering removed splits.
>>>>>>>> 2. If Lenient, automatically removes obsolete splits silently.
>>>>>>>>
>>>>>>>> What Do you think?
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Hongshun
>>>>>>>>
>>>>>>>> On Thu, Oct 9, 2025 at 9:37 PM Leonard Xu <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks hongshun for the updating and pretty detailed analysis for
>>>>>>>>> edge cases,  the updated FLIP looks good to me now.
>>>>>>>>>
>>>>>>>>> Only last implementation details about scenario in motivation
>>>>>>>>> section:
>>>>>>>>>
>>>>>>>>> *Restart with Changed subscription: During restart, if source
>>>>>>>>> options remove a topic or table. The splits which have already 
>>>>>>>>> assigned can
>>>>>>>>> not be removed.*
>>>>>>>>>
>>>>>>>>> Could you clarify how we resolve this in Kafka connector ?
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Leonard
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2025 10月 9 19:48,Hongshun Wang <[email protected]> 写道:
>>>>>>>>>
>>>>>>>>> Hi devs,
>>>>>>>>> If there are no further suggestions, I will start the voting
>>>>>>>>> tomorrow。
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Hongshun
>>>>>>>>>
>>>>>>>>> On Fri, Sep 26, 2025 at 7:48 PM Hongshun Wang <
>>>>>>>>> [email protected]> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Becket and Leonard,
>>>>>>>>>>
>>>>>>>>>> I have updated the content of this FLIP. The key point is that:
>>>>>>>>>>
>>>>>>>>>> When the split enumerator receives a split, *these splits must
>>>>>>>>>> have already existed in pendingSplitAssignment or assignedSplitments*
>>>>>>>>>> .
>>>>>>>>>>
>>>>>>>>>>    - If the split is in pendingSplitAssignments, ignore it.
>>>>>>>>>>    - If the split is in assignedSplitAssignments but has a
>>>>>>>>>>    different taskId, ignore it (this indicates it was already
>>>>>>>>>>    assigned to another task).
>>>>>>>>>>    - If the split is in assignedSplitAssignments and shares the
>>>>>>>>>>    same taskId, move the assignment from assignedSplitments to 
>>>>>>>>>> pendingSplitAssignment
>>>>>>>>>>    to re-assign again.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> For better understanding why use these strategies. I added some
>>>>>>>>>> examples and pictures to show it.
>>>>>>>>>>
>>>>>>>>>> Would you like to help me check whether there are still some
>>>>>>>>>> problems?
>>>>>>>>>>
>>>>>>>>>> Best
>>>>>>>>>> Hongshun
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Sep 26, 2025 at 5:08 PM Leonard Xu <[email protected]>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Thanks Becket and Hongshun for the insightful discussion.
>>>>>>>>>>>
>>>>>>>>>>> The underlying implementation and communication mechanisms of
>>>>>>>>>>> Flink Source indeed involve many intricate details, we discussed 
>>>>>>>>>>> the issue
>>>>>>>>>>> of splits re-assignment in specific scenarios, but fortunately, the 
>>>>>>>>>>> final
>>>>>>>>>>> decision turned out to be pretty clear.
>>>>>>>>>>>
>>>>>>>>>>>  +1 to Becket’s proposal to keeps the framework cleaner and more
>>>>>>>>>>> flexible.
>>>>>>>>>>> +1 to Hongshun’s point to provide comprehensive guidance for
>>>>>>>>>>> connector developers.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Leonard
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 2025 9月 26 16:30,Hongshun Wang <[email protected]> 写道:
>>>>>>>>>>>
>>>>>>>>>>> Hi Becket,
>>>>>>>>>>>
>>>>>>>>>>> I Got it. You’re suggesting we should not handle this in the
>>>>>>>>>>> source framework but instead let the split enumerator manage these 
>>>>>>>>>>> three
>>>>>>>>>>> scenarios.
>>>>>>>>>>>
>>>>>>>>>>> Let me explain why I originally favored handling it in the
>>>>>>>>>>> framework: I'm concerned that connector developers might overlook 
>>>>>>>>>>> certain
>>>>>>>>>>> edge cases (after all, we even payed extensive discussions to fully 
>>>>>>>>>>> clarify
>>>>>>>>>>> the logic)
>>>>>>>>>>>
>>>>>>>>>>> However, your point keeps the framework cleaner and more
>>>>>>>>>>> flexible. Thus, I will take it.
>>>>>>>>>>>
>>>>>>>>>>> Perhaps, in this FLIP, we should focus on providing
>>>>>>>>>>> comprehensive guidance for connector developers: explain how to
>>>>>>>>>>> implement a split enumerator, including the underlying challenges 
>>>>>>>>>>> and their
>>>>>>>>>>> solutions.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Additionally, we can use the Kafka connector as a reference
>>>>>>>>>>> implementation to demonstrate the practical steps. This way, 
>>>>>>>>>>> developers who
>>>>>>>>>>> want to implement similar connectors can directly reference this 
>>>>>>>>>>> example.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Hongshun
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Sep 26, 2025 at 1:27 PM Becket Qin <[email protected]>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> It would be good to not expose runtime details to the source
>>>>>>>>>>>> implementation if possible.
>>>>>>>>>>>>
>>>>>>>>>>>> Today, the split enumerator implementations are expected to
>>>>>>>>>>>> track the split assignment.
>>>>>>>>>>>>
>>>>>>>>>>>> Assuming the split enumerator implementation keeps a split
>>>>>>>>>>>> assignment map, that means the enumerator should already know 
>>>>>>>>>>>> whether a
>>>>>>>>>>>> split is assigned or unassigned. So it can handle the three 
>>>>>>>>>>>> scenarios you
>>>>>>>>>>>> mentioned.
>>>>>>>>>>>>
>>>>>>>>>>>> The split is reported by a reader during a global restoration.
>>>>>>>>>>>>>
>>>>>>>>>>>> The split enumerator should have just been restored / created.
>>>>>>>>>>>> If the enumerator expects a full reassignment of splits up on 
>>>>>>>>>>>> global
>>>>>>>>>>>> recovery, there should be no assigned splits to that reader in the 
>>>>>>>>>>>> split
>>>>>>>>>>>> assignment mapping.
>>>>>>>>>>>>
>>>>>>>>>>>> The split is reported by a reader during a partial failure
>>>>>>>>>>>>> recovery.
>>>>>>>>>>>>>
>>>>>>>>>>>> In this case, when SplitEnumerator.addReader() is invoked, the
>>>>>>>>>>>> split assignment map in the enumerator implementation should 
>>>>>>>>>>>> already have
>>>>>>>>>>>> some split assignments for the reader. Therefore it is a partial 
>>>>>>>>>>>> failover.
>>>>>>>>>>>> If the source supports split reassignment on recovery, the 
>>>>>>>>>>>> enumerator can
>>>>>>>>>>>> assign splits that are different from the reported assignment of 
>>>>>>>>>>>> that
>>>>>>>>>>>> reader in the SplitEnumeratorContext, or it can also assign the 
>>>>>>>>>>>> same
>>>>>>>>>>>> splits. In any case, the enumerator knows that this is a partial 
>>>>>>>>>>>> recovery
>>>>>>>>>>>> because the assignment map is non-empty.
>>>>>>>>>>>>
>>>>>>>>>>>> The split is not reported by a reader, but is assigned after
>>>>>>>>>>>>> the last successful checkpoint and was never acknowledged.
>>>>>>>>>>>>
>>>>>>>>>>>> This is actually one of the step in the partial failure
>>>>>>>>>>>> recover. SplitEnumerator.addSplitsBack() will be called first 
>>>>>>>>>>>> before
>>>>>>>>>>>> SplitReader.addReader() is called for the recovered reader. When 
>>>>>>>>>>>> the
>>>>>>>>>>>> SplitEnumerator.addSplitsBack() is invoked, it is for sure a 
>>>>>>>>>>>> partial
>>>>>>>>>>>> recovery. And the enumerator should remove these splits from the 
>>>>>>>>>>>> split
>>>>>>>>>>>> assignment map as if they were never assigned.
>>>>>>>>>>>>
>>>>>>>>>>>> I think this should work, right?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>
>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>
>>>>>>>>>>>> On Thu, Sep 25, 2025 at 8:34 PM Hongshun Wang <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Becket and Leonard,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for your advice.
>>>>>>>>>>>>>
>>>>>>>>>>>>> > put all the reader information in the SplitEnumerator context
>>>>>>>>>>>>> I have a concern: the current registeredReaders in*
>>>>>>>>>>>>> SourceCoordinatorContext will be removed after subtaskResetor 
>>>>>>>>>>>>> execution on
>>>>>>>>>>>>> failure*.However, this approach has merit.
>>>>>>>>>>>>>
>>>>>>>>>>>>> One more situation I found my previous design does not cover:
>>>>>>>>>>>>>
>>>>>>>>>>>>>    1. Initial state: Reader A reports splits (1, 2).
>>>>>>>>>>>>>    2. Enumerator action: Assigns split 1 to Reader A, and
>>>>>>>>>>>>>    split 2 to Reader B.
>>>>>>>>>>>>>    3. Failure scenario: Reader A fails before checkpointing.
>>>>>>>>>>>>>    Since this is a partial failure, only Reader A restarts.
>>>>>>>>>>>>>    4. Recovery issue: Upon recovery, Reader A re-reports
>>>>>>>>>>>>>    split (1).
>>>>>>>>>>>>>
>>>>>>>>>>>>> In my previous design, the enumerator will ignore Reader A's
>>>>>>>>>>>>> re-registration which will cause data loss.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thus, when the enumerator receives a split, the split may
>>>>>>>>>>>>> originate from three scenarios:
>>>>>>>>>>>>>
>>>>>>>>>>>>>    1. The split is reported by a reader during a global
>>>>>>>>>>>>>    restoration.
>>>>>>>>>>>>>    2. The split is reported by a reader during a partial
>>>>>>>>>>>>>    failure recovery.
>>>>>>>>>>>>>    3. The split is not reported by a reader, but is assigned
>>>>>>>>>>>>>    after the last successful checkpoint and was never 
>>>>>>>>>>>>> acknowledged.
>>>>>>>>>>>>>
>>>>>>>>>>>>> In the first scenario (global restore), the split should
>>>>>>>>>>>>> be re-distributed. For the latter two scenarios (partial failover 
>>>>>>>>>>>>> and
>>>>>>>>>>>>> post-checkpoint assignment), we need to reassign the split to
>>>>>>>>>>>>> its originally assigned subtask.
>>>>>>>>>>>>>
>>>>>>>>>>>>> By implementing a method in the SplitEnumerator context to
>>>>>>>>>>>>> track each assigned split's status, the system can correctly 
>>>>>>>>>>>>> identify and
>>>>>>>>>>>>> resolve split ownership in all three scenarios.*What about
>>>>>>>>>>>>> adding a  `SplitRecoveryType splitRecoveryType(Split split)` in
>>>>>>>>>>>>> SplitEnumeratorContext.* SplitRecoveryTypeis a enum including
>>>>>>>>>>>>> `UNASSIGNED`、`GLOBAL_RESTORE`、`PARTIAL_FAILOVER` and
>>>>>>>>>>>>> `POST_CHECKPOINT_ASSIGNMENT`.
>>>>>>>>>>>>>
>>>>>>>>>>>>> What do you think? Are there any details or scenarios I
>>>>>>>>>>>>> haven't considered? Looking forward to your advice.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Hongshun
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Thu, Sep 11, 2025 at 12:41 AM Becket Qin <
>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the explanation, Hongshun.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Current pattern of handling new reader registration following:
>>>>>>>>>>>>>> 1. put all the reader information in the SplitEnumerator
>>>>>>>>>>>>>> context
>>>>>>>>>>>>>> 2. notify the enumerator about the new reader registration.
>>>>>>>>>>>>>> 3. Let the split enumerator get whatever information it wants
>>>>>>>>>>>>>> from the
>>>>>>>>>>>>>> context and do its job.
>>>>>>>>>>>>>> This pattern decouples the information passing and the reader
>>>>>>>>>>>>>> registration
>>>>>>>>>>>>>> notification. This makes the API extensible - we can add more
>>>>>>>>>>>>>> information
>>>>>>>>>>>>>> (e.g. reported assigned splits in our case) about the reader
>>>>>>>>>>>>>> to the context
>>>>>>>>>>>>>> without introducing new methods.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Introducing a new method of addSplitBackOnRecovery() is
>>>>>>>>>>>>>> redundant to the
>>>>>>>>>>>>>> above pattern. Do we really need it?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Sep 8, 2025 at 8:18 PM Hongshun Wang <
>>>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> > Hi Becket,
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > > I am curious what would the enumerator do differently for
>>>>>>>>>>>>>> the splits
>>>>>>>>>>>>>> > added via addSplitsBackOnRecovery() V.S. addSplitsBack()?
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> >  In this FLIP, there are two distinct scenarios in which
>>>>>>>>>>>>>> the enumerator
>>>>>>>>>>>>>> > receives splits being added back:
>>>>>>>>>>>>>> > 1.  Job-level restore: The job is restored,  splits from
>>>>>>>>>>>>>> reader’s state are
>>>>>>>>>>>>>> > reported by ReaderRegistrationEvent.
>>>>>>>>>>>>>> > 2.  Reader-level restart: a reader is started but not the
>>>>>>>>>>>>>> whole  job,
>>>>>>>>>>>>>> >  splits assigned to it after the last successful
>>>>>>>>>>>>>> checkpoint. This is what
>>>>>>>>>>>>>> > addSplitsBack used to do.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > In these two situations, the enumerator will choose
>>>>>>>>>>>>>> different strategies.
>>>>>>>>>>>>>> > 1. Job-level restore: the splits should be redistributed
>>>>>>>>>>>>>> across readers
>>>>>>>>>>>>>> > according to the current partitioner strategy.
>>>>>>>>>>>>>> > 2. Reader-level restart: the splits should be reassigned
>>>>>>>>>>>>>> directly back to
>>>>>>>>>>>>>> > the same reader they were originally assigned to,
>>>>>>>>>>>>>> preserving locality and
>>>>>>>>>>>>>> > avoiding unnecessary redistribution
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Therefore, the enumerator must clearly distinguish between
>>>>>>>>>>>>>> these two
>>>>>>>>>>>>>> > scenarios.I used to deprecate the former
>>>>>>>>>>>>>> addSplitsBack(List<SplitT>
>>>>>>>>>>>>>> > splits, int subtaskId) but add a new
>>>>>>>>>>>>>> addSplitsBack(List<SplitT>
>>>>>>>>>>>>>> > splits, int subtaskId,
>>>>>>>>>>>>>> > boolean reportedByReader).
>>>>>>>>>>>>>> > Leonard suggest to use another method
>>>>>>>>>>>>>> addSplitsBackOnRecovery but not
>>>>>>>>>>>>>> > influenced  currently addSplitsBack.
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > Best
>>>>>>>>>>>>>> > Hongshun
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> > On 2025/09/08 17:20:31 Becket Qin wrote:
>>>>>>>>>>>>>> > > Hi Leonard,
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > > Could we introduce a new method like
>>>>>>>>>>>>>> addSplitsBackOnRecovery  with
>>>>>>>>>>>>>> > default
>>>>>>>>>>>>>> > > > implementation. In this way, we can provide better
>>>>>>>>>>>>>> backward
>>>>>>>>>>>>>> > compatibility
>>>>>>>>>>>>>> > > > and also makes it easier for developers to understand.
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > I am curious what would the enumerator do differently for
>>>>>>>>>>>>>> the splits
>>>>>>>>>>>>>> > added
>>>>>>>>>>>>>> > > via addSplitsBackOnRecovery() V.S. addSplitsBack()?
>>>>>>>>>>>>>> Today,
>>>>>>>>>>>>>> > addSplitsBack()
>>>>>>>>>>>>>> > > is also only called upon recovery. So the new method
>>>>>>>>>>>>>> seems confusing. One
>>>>>>>>>>>>>> > > thing worth clarifying is if the Source implements
>>>>>>>>>>>>>> > > SupportSplitReassignmentOnRecovery, upon recovery, should
>>>>>>>>>>>>>> the splits
>>>>>>>>>>>>>> > > reported by the readers also be added back to the
>>>>>>>>>>>>>> SplitEnumerator via the
>>>>>>>>>>>>>> > > addSplitsBack() call? Or should the SplitEnumerator
>>>>>>>>>>>>>> explicitly query the
>>>>>>>>>>>>>> > > registered reader information via the
>>>>>>>>>>>>>> SplitEnumeratorContext to get the
>>>>>>>>>>>>>> > > originally assigned splits when addReader() is invoked? I
>>>>>>>>>>>>>> was assuming
>>>>>>>>>>>>>> > the
>>>>>>>>>>>>>> > > latter in the beginning, so the behavior of
>>>>>>>>>>>>>> addSplitsBack() remains
>>>>>>>>>>>>>> > > unchanged, but I am not opposed in doing the former.
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > Also, can you elaborate on the backwards compatibility
>>>>>>>>>>>>>> issue you see if
>>>>>>>>>>>>>> > we
>>>>>>>>>>>>>> > > do not have a separate addSplitsBackOnRecovery() method?
>>>>>>>>>>>>>> Even without
>>>>>>>>>>>>>> > this
>>>>>>>>>>>>>> > > new method, the behavior remains exactly the same unless
>>>>>>>>>>>>>> the end users
>>>>>>>>>>>>>> > > implement the mix-in interface of
>>>>>>>>>>>>>> "SupportSplitReassignmentOnRecovery",
>>>>>>>>>>>>>> > > right?
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > Thanks,
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > Jiangjie (Becket) Qin
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > On Mon, Sep 8, 2025 at 1:48 AM Hongshun Wang <
>>>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>>>> > > wrote:
>>>>>>>>>>>>>> > >
>>>>>>>>>>>>>> > > > Hi devs,
>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>> > > > It has been quite some time since this FLIP[1] was
>>>>>>>>>>>>>> first proposed.
>>>>>>>>>>>>>> > Thank
>>>>>>>>>>>>>> > > > you for your valuable feedback—based on your
>>>>>>>>>>>>>> suggestions, the FLIP has
>>>>>>>>>>>>>> > > > undergone several rounds of revisions.
>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>> > > > Any more advice is welcome and appreciated. If there
>>>>>>>>>>>>>> are no further
>>>>>>>>>>>>>> > > > concerns, I plan to start the vote tomorrow.
>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>> > > > Best
>>>>>>>>>>>>>> > > > Hongshun
>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>> > > > [1]
>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480
>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>> > > > On Mon, Sep 8, 2025 at 4:42 PM Hongshun Wang <
>>>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>>>> > > > wrote:
>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>> > > > > Hi Leonard,
>>>>>>>>>>>>>> > > > > Thanks for your advice.  It makes sense and I have
>>>>>>>>>>>>>> modified it.
>>>>>>>>>>>>>> > > > >
>>>>>>>>>>>>>> > > > > Best,
>>>>>>>>>>>>>> > > > > Hongshun
>>>>>>>>>>>>>> > > > >
>>>>>>>>>>>>>> > > > > On Mon, Sep 8, 2025 at 11:40 AM Leonard Xu <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>> > > > >
>>>>>>>>>>>>>> > > > >> Thanks Hongshun and Becket for the deep discussion.
>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>> > > > >> I only have one comment for one API design:
>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>> > > > >> > Deprecate the old addSplitsBack  method, use a
>>>>>>>>>>>>>> addSplitsBack with
>>>>>>>>>>>>>> > > > >> param isReportedByReader instead. Because, The
>>>>>>>>>>>>>> enumerator can apply
>>>>>>>>>>>>>> > > > >> different reassignment policies based on the context.
>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>> > > > >> Could we introduce a new method like
>>>>>>>>>>>>>> *addSplitsBackOnRecovery*  with
>>>>>>>>>>>>>> > > > default
>>>>>>>>>>>>>> > > > >> implementation. In this way, we can provide better
>>>>>>>>>>>>>> backward
>>>>>>>>>>>>>> > > > >> compatibility and also makes it easier for
>>>>>>>>>>>>>> developers to understand.
>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>> > > > >> Best,
>>>>>>>>>>>>>> > > > >> Leonard
>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>> > > > >> 2025 9月 3 20:26,Hongshun Wang <[email protected]> 写道:
>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>> > > > >> Hi Becket,
>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>> > > > >> I think that's a great idea!  I have added the
>>>>>>>>>>>>>> > > > >> SupportSplitReassignmentOnRecovery interface in this
>>>>>>>>>>>>>> FLIP. If a
>>>>>>>>>>>>>> > Source
>>>>>>>>>>>>>> > > > >> implements this interface indicates that the source
>>>>>>>>>>>>>> operator needs
>>>>>>>>>>>>>> > to
>>>>>>>>>>>>>> > > > >> report splits to the enumerator and receive
>>>>>>>>>>>>>> reassignment.[1]
>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>> > > > >> Best,
>>>>>>>>>>>>>> > > > >> Hongshun
>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>> > > > >> [1]
>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>> > > > >> On Thu, Aug 21, 2025 at 12:09 PM Becket Qin <
>>>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>>>> > > > wrote:
>>>>>>>>>>>>>> > > > >>
>>>>>>>>>>>>>> > > > >>> Hi Hongshun,
>>>>>>>>>>>>>> > > > >>>
>>>>>>>>>>>>>> > > > >>> I think the convention for such optional features
>>>>>>>>>>>>>> in Source is via
>>>>>>>>>>>>>> > > > >>> mix-in interfaces. So instead of adding a method to
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> > SourceReader,
>>>>>>>>>>>>>> > > > maybe
>>>>>>>>>>>>>> > > > >>> we should introduce an interface
>>>>>>>>>>>>>> SupportSplitReassingmentOnRecovery
>>>>>>>>>>>>>> > > > with
>>>>>>>>>>>>>> > > > >>> this method. If a Source implementation implements
>>>>>>>>>>>>>> that interface,
>>>>>>>>>>>>>> > > > then the
>>>>>>>>>>>>>> > > > >>> SourceOperator will check the desired behavior and
>>>>>>>>>>>>>> act accordingly.
>>>>>>>>>>>>>> > > > >>>
>>>>>>>>>>>>>> > > > >>> Thanks,
>>>>>>>>>>>>>> > > > >>>
>>>>>>>>>>>>>> > > > >>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>> > > > >>>
>>>>>>>>>>>>>> > > > >>> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang <
>>>>>>>>>>>>>> > [email protected]
>>>>>>>>>>>>>> > > > >
>>>>>>>>>>>>>> > > > >>> wrote:
>>>>>>>>>>>>>> > > > >>>
>>>>>>>>>>>>>> > > > >>>> Hi de vs,
>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>> > > > >>>> Would anyone like to discuss this FLIP? I'd
>>>>>>>>>>>>>> appreciate your
>>>>>>>>>>>>>> > feedback
>>>>>>>>>>>>>> > > > >>>> and suggestions.
>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>> > > > >>>> Best,
>>>>>>>>>>>>>> > > > >>>> Hongshun
>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>> > > > >>>> 2025年8月13日 14:23,Hongshun Wang <[email protected]>
>>>>>>>>>>>>>> 写道:
>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>> > > > >>>> Hi Becket,
>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>> > > > >>>> Thank you for your detailed feedback. The new
>>>>>>>>>>>>>> contract makes good
>>>>>>>>>>>>>> > > > sense
>>>>>>>>>>>>>> > > > >>>> to me and effectively addresses the issues I
>>>>>>>>>>>>>> encountered at the
>>>>>>>>>>>>>> > > > beginning
>>>>>>>>>>>>>> > > > >>>> of the design.
>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>> > > > >>>> That said, I recommend not reporting splits by
>>>>>>>>>>>>>> default, primarily
>>>>>>>>>>>>>> > for
>>>>>>>>>>>>>> > > > >>>> compatibility and practical reasons:
>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>> > > > >>>> >  For these reasons, we do not expect the Split
>>>>>>>>>>>>>> objects to be
>>>>>>>>>>>>>> > huge,
>>>>>>>>>>>>>> > > > >>>> and we are not trying to design for huge Split
>>>>>>>>>>>>>> objects either as
>>>>>>>>>>>>>> > they
>>>>>>>>>>>>>> > > > will
>>>>>>>>>>>>>> > > > >>>> have problems even today.
>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>> > > > >>>>    1.
>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>> > > > >>>>    Not all existing connector match this rule
>>>>>>>>>>>>>> > > > >>>>    For example, in mysql cdc connector, a binlog
>>>>>>>>>>>>>> split may contain
>>>>>>>>>>>>>> > > > >>>>    hundreds (or even more) snapshot split
>>>>>>>>>>>>>> completion records. This
>>>>>>>>>>>>>> > > > state is
>>>>>>>>>>>>>> > > > >>>>    large and is currently transmitted
>>>>>>>>>>>>>> incrementally through
>>>>>>>>>>>>>> > multiple
>>>>>>>>>>>>>> > > > >>>>    BinlogSplitMetaEvent messages. Since the binlog
>>>>>>>>>>>>>> reader operates
>>>>>>>>>>>>>> > > > >>>>    with single parallelism, reporting the full
>>>>>>>>>>>>>> split state on
>>>>>>>>>>>>>> > recovery
>>>>>>>>>>>>>> > > > >>>>    could be inefficient or even infeasible.
>>>>>>>>>>>>>> > > > >>>>    For such sources, it would be better to provide
>>>>>>>>>>>>>> a mechanism to
>>>>>>>>>>>>>> > skip
>>>>>>>>>>>>>> > > > >>>>    split reporting during restart until they
>>>>>>>>>>>>>> redesign and reduce
>>>>>>>>>>>>>> > the
>>>>>>>>>>>>>> > > > >>>>    split size.
>>>>>>>>>>>>>> > > > >>>>    2.
>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>> > > > >>>>    Not all enumerators maintain unassigned splits
>>>>>>>>>>>>>> in state.
>>>>>>>>>>>>>> > > > >>>>    Some SplitEnumerator(such as kafka connector)
>>>>>>>>>>>>>> implementations
>>>>>>>>>>>>>> > do
>>>>>>>>>>>>>> > > > >>>>    not track or persistently manage unassigned
>>>>>>>>>>>>>> splits. Requiring
>>>>>>>>>>>>>> > them
>>>>>>>>>>>>>> > > > to
>>>>>>>>>>>>>> > > > >>>>    handle re-registration would add unnecessary
>>>>>>>>>>>>>> complexity. Even
>>>>>>>>>>>>>> > > > though we
>>>>>>>>>>>>>> > > > >>>>    maybe implements in kafka connector, currently,
>>>>>>>>>>>>>> kafka connector
>>>>>>>>>>>>>> > is
>>>>>>>>>>>>>> > > > decouple
>>>>>>>>>>>>>> > > > >>>>    with flink version, we also need to make sure
>>>>>>>>>>>>>> the elder version
>>>>>>>>>>>>>> > is
>>>>>>>>>>>>>> > > > >>>>    compatible.
>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>> > > > >>>> ------------------------------
>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>> > > > >>>> To address these concerns, I propose introducing a
>>>>>>>>>>>>>> new method:
>>>>>>>>>>>>>> > boolean
>>>>>>>>>>>>>> > > > >>>> SourceReader#shouldReassignSplitsOnRecovery() with
>>>>>>>>>>>>>> a default
>>>>>>>>>>>>>> > > > >>>> implementation returning false. This allows source
>>>>>>>>>>>>>> readers to opt
>>>>>>>>>>>>>> > in
>>>>>>>>>>>>>> > > > >>>> to split reassignment only when necessary. Since
>>>>>>>>>>>>>> the new contract
>>>>>>>>>>>>>> > > > already
>>>>>>>>>>>>>> > > > >>>> places the responsibility for split assignment on
>>>>>>>>>>>>>> the enumerator,
>>>>>>>>>>>>>> > not
>>>>>>>>>>>>>> > > > >>>> reporting splits by default is a safe and clean
>>>>>>>>>>>>>> default behavior.
>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>> > > > >>>> ------------------------------
>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>> > > > >>>> I’ve updated the implementation and the FIP
>>>>>>>>>>>>>> accordingly[1]. It
>>>>>>>>>>>>>> > quite a
>>>>>>>>>>>>>> > > > >>>> big change. In particular, for the Kafka
>>>>>>>>>>>>>> connector, we can now use
>>>>>>>>>>>>>> > a
>>>>>>>>>>>>>> > > > >>>> pluggable SplitPartitioner to support different
>>>>>>>>>>>>>> split assignment
>>>>>>>>>>>>>> > > > >>>> strategies (e.g., default, round-robin).
>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>> > > > >>>> Could you please review it when you have a chance?
>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>> > > > >>>> Best,
>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>> > > > >>>> Hongshun
>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>> > > > >>>> [1]
>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>> > > >
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>> > > > >>>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <
>>>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>>>> > > > wrote:
>>>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>>>> > > > >>>>> Hi Hongshun,
>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>> > > > >>>>> I am not too concerned about the transmission
>>>>>>>>>>>>>> cost. Because the
>>>>>>>>>>>>>> > full
>>>>>>>>>>>>>> > > > >>>>> split transmission has to happen in the initial
>>>>>>>>>>>>>> assignment phase
>>>>>>>>>>>>>> > > > already.
>>>>>>>>>>>>>> > > > >>>>> And in the future, we probably want to also
>>>>>>>>>>>>>> introduce some kind
>>>>>>>>>>>>>> > of
>>>>>>>>>>>>>> > > > workload
>>>>>>>>>>>>>> > > > >>>>> balance across source readers, e.g. based on the
>>>>>>>>>>>>>> per-split
>>>>>>>>>>>>>> > > > throughput or
>>>>>>>>>>>>>> > > > >>>>> the per-source-reader workload in heterogeneous
>>>>>>>>>>>>>> clusters. For
>>>>>>>>>>>>>> > these
>>>>>>>>>>>>>> > > > >>>>> reasons, we do not expect the Split objects to be
>>>>>>>>>>>>>> huge, and we
>>>>>>>>>>>>>> > are
>>>>>>>>>>>>>> > > > not
>>>>>>>>>>>>>> > > > >>>>> trying to design for huge Split objects either as
>>>>>>>>>>>>>> they will have
>>>>>>>>>>>>>> > > > problems
>>>>>>>>>>>>>> > > > >>>>> even today.
>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>> > > > >>>>> Good point on the potential split loss, please
>>>>>>>>>>>>>> see the reply
>>>>>>>>>>>>>> > below:
>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>> > > > >>>>> Scenario 2:
>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and Reader
>>>>>>>>>>>>>> B reports (3
>>>>>>>>>>>>>> > and 4)
>>>>>>>>>>>>>> > > > >>>>>> upon restart.
>>>>>>>>>>>>>> > > > >>>>>> 2. Before the enumerator receives all reports
>>>>>>>>>>>>>> and performs
>>>>>>>>>>>>>> > > > >>>>>> reassignment, a checkpoint is triggered.
>>>>>>>>>>>>>> > > > >>>>>> 3. Since no splits have been reassigned yet,
>>>>>>>>>>>>>> both readers have
>>>>>>>>>>>>>> > empty
>>>>>>>>>>>>>> > > > >>>>>> states.
>>>>>>>>>>>>>> > > > >>>>>> 4. When restarting from this checkpoint, all
>>>>>>>>>>>>>> four splits are
>>>>>>>>>>>>>> > lost.
>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>> > > > >>>>> The reader registration happens in the
>>>>>>>>>>>>>> SourceOperator.open(),
>>>>>>>>>>>>>> > which
>>>>>>>>>>>>>> > > > >>>>> means the task is still in the initializing
>>>>>>>>>>>>>> state, therefore the
>>>>>>>>>>>>>> > > > checkpoint
>>>>>>>>>>>>>> > > > >>>>> should not be triggered until the enumerator
>>>>>>>>>>>>>> receives all the
>>>>>>>>>>>>>> > split
>>>>>>>>>>>>>> > > > reports.
>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>> > > > >>>>> There is a nuance here. Today, the RPC call from
>>>>>>>>>>>>>> the TM to the JM
>>>>>>>>>>>>>> > is
>>>>>>>>>>>>>> > > > >>>>> async. So it is possible that the
>>>>>>>>>>>>>> SourceOpertor.open() has
>>>>>>>>>>>>>> > returned,
>>>>>>>>>>>>>> > > > but
>>>>>>>>>>>>>> > > > >>>>> the enumerator has not received the split
>>>>>>>>>>>>>> reports. However,
>>>>>>>>>>>>>> > because
>>>>>>>>>>>>>> > > > the
>>>>>>>>>>>>>> > > > >>>>> task status update RPC call goes to the same
>>>>>>>>>>>>>> channel as the split
>>>>>>>>>>>>>> > > > reports
>>>>>>>>>>>>>> > > > >>>>> call, so the task status RPC call will happen
>>>>>>>>>>>>>> after the split
>>>>>>>>>>>>>> > > > reports call
>>>>>>>>>>>>>> > > > >>>>> on the JM side. Therefore, on the JM side, the
>>>>>>>>>>>>>> SourceCoordinator
>>>>>>>>>>>>>> > will
>>>>>>>>>>>>>> > > > >>>>> always first receive the split reports, then
>>>>>>>>>>>>>> receive the
>>>>>>>>>>>>>> > checkpoint
>>>>>>>>>>>>>> > > > request.
>>>>>>>>>>>>>> > > > >>>>> This "happen before" relationship is kind of
>>>>>>>>>>>>>> important to
>>>>>>>>>>>>>> > guarantee
>>>>>>>>>>>>>> > > > >>>>> the consistent state between enumerator and
>>>>>>>>>>>>>> readers.
>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>> > > > >>>>> Scenario 1:
>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned
>>>>>>>>>>>>>> splits (1 and 2), and
>>>>>>>>>>>>>> > > > >>>>>> Reader B reports (3 and 4).
>>>>>>>>>>>>>> > > > >>>>>> 2. The enumerator receives these reports but
>>>>>>>>>>>>>> only reassigns
>>>>>>>>>>>>>> > splits 1
>>>>>>>>>>>>>> > > > >>>>>> and 2 — not 3 and 4.
>>>>>>>>>>>>>> > > > >>>>>> 3. A checkpoint or savepoint is then triggered.
>>>>>>>>>>>>>> Only splits 1
>>>>>>>>>>>>>> > and 2
>>>>>>>>>>>>>> > > > >>>>>> are recorded in the reader states; splits 3 and
>>>>>>>>>>>>>> 4 are not
>>>>>>>>>>>>>> > persisted.
>>>>>>>>>>>>>> > > > >>>>>> 4. If the job is later restarted from this
>>>>>>>>>>>>>> checkpoint, splits 3
>>>>>>>>>>>>>> > and
>>>>>>>>>>>>>> > > > 4
>>>>>>>>>>>>>> > > > >>>>>> will be permanently lost.
>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>> > > > >>>>> This scenario is possible. One solution is to let
>>>>>>>>>>>>>> the enumerator
>>>>>>>>>>>>>> > > > >>>>> implementation handle this. That means if the
>>>>>>>>>>>>>> enumerator relies
>>>>>>>>>>>>>> > on
>>>>>>>>>>>>>> > > > the
>>>>>>>>>>>>>> > > > >>>>> initial split reports from the source readers, it
>>>>>>>>>>>>>> should maintain
>>>>>>>>>>>>>> > > > these
>>>>>>>>>>>>>> > > > >>>>> reports by itself. In the above example, the
>>>>>>>>>>>>>> enumerator will need
>>>>>>>>>>>>>> > to
>>>>>>>>>>>>>> > > > >>>>> remember that 3 and 4 are not assigned and put it
>>>>>>>>>>>>>> into its own
>>>>>>>>>>>>>> > state.
>>>>>>>>>>>>>> > > > >>>>> The current contract is that anything assigned to
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> > SourceReaders
>>>>>>>>>>>>>> > > > >>>>> are completely owned by the SourceReaders.
>>>>>>>>>>>>>> Enumerators can
>>>>>>>>>>>>>> > remember
>>>>>>>>>>>>>> > > > the
>>>>>>>>>>>>>> > > > >>>>> assignments but cannot change them, even when the
>>>>>>>>>>>>>> source reader
>>>>>>>>>>>>>> > > > recovers /
>>>>>>>>>>>>>> > > > >>>>> restarts.
>>>>>>>>>>>>>> > > > >>>>> With this FLIP, the contract becomes that the
>>>>>>>>>>>>>> source readers will
>>>>>>>>>>>>>> > > > >>>>> return the ownership of the splits to the
>>>>>>>>>>>>>> enumerator. So the
>>>>>>>>>>>>>> > > > enumerator is
>>>>>>>>>>>>>> > > > >>>>> responsible for maintaining these splits, until
>>>>>>>>>>>>>> they are assigned
>>>>>>>>>>>>>> > to
>>>>>>>>>>>>>> > > > a
>>>>>>>>>>>>>> > > > >>>>> source reader again.
>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>> > > > >>>>> There are other cases where there may be conflict
>>>>>>>>>>>>>> information
>>>>>>>>>>>>>> > between
>>>>>>>>>>>>>> > > > >>>>> reader and enumerator. For example, consider the
>>>>>>>>>>>>>> following
>>>>>>>>>>>>>> > sequence:
>>>>>>>>>>>>>> > > > >>>>> 1. reader A reports splits (1 and 2) up on
>>>>>>>>>>>>>> restart.
>>>>>>>>>>>>>> > > > >>>>> 2. enumerator receives the report and assigns
>>>>>>>>>>>>>> both 1 and 2 to
>>>>>>>>>>>>>> > reader
>>>>>>>>>>>>>> > > > B.
>>>>>>>>>>>>>> > > > >>>>> 3. reader A failed before checkpointing. And this
>>>>>>>>>>>>>> is a partial
>>>>>>>>>>>>>> > > > >>>>> failure, so only reader A restarts.
>>>>>>>>>>>>>> > > > >>>>> 4. When reader A recovers, it will again report
>>>>>>>>>>>>>> splits (1 and 2)
>>>>>>>>>>>>>> > to
>>>>>>>>>>>>>> > > > >>>>> the enumerator.
>>>>>>>>>>>>>> > > > >>>>> 5. The enumerator should ignore this report
>>>>>>>>>>>>>> because it has
>>>>>>>>>>>>>> > > > >>>>> assigned splits (1 and 2) to reader B.
>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>> > > > >>>>> So with the new contract, the enumerator should
>>>>>>>>>>>>>> be the source of
>>>>>>>>>>>>>> > > > truth
>>>>>>>>>>>>>> > > > >>>>> for split ownership.
>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>> > > > >>>>> Thanks,
>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>> > > > >>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>> > > > >>>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang <
>>>>>>>>>>>>>> > > > [email protected]>
>>>>>>>>>>>>>> > > > >>>>> wrote:
>>>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>>>> > > > >>>>>> Hi Becket,
>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>> > > > >>>>>> I did consider this approach at the beginning
>>>>>>>>>>>>>> (and it was also
>>>>>>>>>>>>>> > > > >>>>>> mentioned in this FLIP), since it would allow
>>>>>>>>>>>>>> more flexibility
>>>>>>>>>>>>>> > in
>>>>>>>>>>>>>> > > > >>>>>> reassigning all splits. However, there are a few
>>>>>>>>>>>>>> potential
>>>>>>>>>>>>>> > issues.
>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>> > > > >>>>>> 1. High Transmission Cost
>>>>>>>>>>>>>> > > > >>>>>> If we pass the full split objects (rather than
>>>>>>>>>>>>>> just split IDs),
>>>>>>>>>>>>>> > the
>>>>>>>>>>>>>> > > > >>>>>> data size could be significant, leading to high
>>>>>>>>>>>>>> overhead during
>>>>>>>>>>>>>> > > > >>>>>> transmission — especially when many splits are
>>>>>>>>>>>>>> involved.
>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>> > > > >>>>>> 2. Risk of Split Loss
>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>> > > > >>>>>> Risk of split loss exists unless we have a
>>>>>>>>>>>>>> mechanism to make
>>>>>>>>>>>>>> > sure
>>>>>>>>>>>>>> > > > >>>>>> only can checkpoint after all the splits are
>>>>>>>>>>>>>> reassigned.
>>>>>>>>>>>>>> > > > >>>>>> There are scenarios where splits could be lost
>>>>>>>>>>>>>> due to
>>>>>>>>>>>>>> > inconsistent
>>>>>>>>>>>>>> > > > >>>>>> state handling during recovery:
>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>> > > > >>>>>> Scenario 1:
>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>> > > > >>>>>>    1. Upon restart, Reader A reports assigned
>>>>>>>>>>>>>> splits (1 and 2),
>>>>>>>>>>>>>> > and
>>>>>>>>>>>>>> > > > >>>>>>    Reader B reports (3 and 4).
>>>>>>>>>>>>>> > > > >>>>>>    2. The enumerator receives these reports but
>>>>>>>>>>>>>> only reassigns
>>>>>>>>>>>>>> > > > >>>>>>    splits 1 and 2 — not 3 and 4.
>>>>>>>>>>>>>> > > > >>>>>>    3. A checkpoint or savepoint is then
>>>>>>>>>>>>>> triggered. Only splits 1
>>>>>>>>>>>>>> > and
>>>>>>>>>>>>>> > > > >>>>>>    2 are recorded in the reader states; splits 3
>>>>>>>>>>>>>> and 4 are not
>>>>>>>>>>>>>> > > > persisted.
>>>>>>>>>>>>>> > > > >>>>>>    4. If the job is later restarted from this
>>>>>>>>>>>>>> checkpoint, splits
>>>>>>>>>>>>>> > 3
>>>>>>>>>>>>>> > > > >>>>>>    and 4 will be permanently lost.
>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>> > > > >>>>>> Scenario 2:
>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>> > > > >>>>>>    1. Reader A reports splits (1 and 2), and
>>>>>>>>>>>>>> Reader B reports (3
>>>>>>>>>>>>>> > and
>>>>>>>>>>>>>> > > > >>>>>>    4) upon restart.
>>>>>>>>>>>>>> > > > >>>>>>    2. Before the enumerator receives all reports
>>>>>>>>>>>>>> and performs
>>>>>>>>>>>>>> > > > >>>>>>    reassignment, a checkpoint is triggered.
>>>>>>>>>>>>>> > > > >>>>>>    3. Since no splits have been reassigned yet,
>>>>>>>>>>>>>> both readers
>>>>>>>>>>>>>> > have
>>>>>>>>>>>>>> > > > >>>>>>    empty states.
>>>>>>>>>>>>>> > > > >>>>>>    4. When restarting from this checkpoint, all
>>>>>>>>>>>>>> four splits are
>>>>>>>>>>>>>> > > > lost.
>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>> > > > >>>>>> Let me know if you have thoughts on how we might
>>>>>>>>>>>>>> mitigate these
>>>>>>>>>>>>>> > > > risks!
>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>> > > > >>>>>> Best
>>>>>>>>>>>>>> > > > >>>>>> Hongshun
>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>> > > > >>>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <
>>>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>>>> > > > >>>>>> wrote:
>>>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>>>> > > > >>>>>>> Hi Hongshun,
>>>>>>>>>>>>>> > > > >>>>>>>
>>>>>>>>>>>>>> > > > >>>>>>> The steps sound reasonable to me in general. In
>>>>>>>>>>>>>> terms of the
>>>>>>>>>>>>>> > > > updated
>>>>>>>>>>>>>> > > > >>>>>>> FLIP wiki, it would be good to see if we can
>>>>>>>>>>>>>> keep the protocol
>>>>>>>>>>>>>> > > > simple. One
>>>>>>>>>>>>>> > > > >>>>>>> alternative way to achieve this behavior is
>>>>>>>>>>>>>> following:
>>>>>>>>>>>>>> > > > >>>>>>>
>>>>>>>>>>>>>> > > > >>>>>>> 1. Upon SourceOperator startup, the
>>>>>>>>>>>>>> SourceOperator sends
>>>>>>>>>>>>>> > > > >>>>>>> ReaderRegistrationEvent with the currently
>>>>>>>>>>>>>> assigned splits to
>>>>>>>>>>>>>> > the
>>>>>>>>>>>>>> > > > >>>>>>> enumerator. It does not add these splits to the
>>>>>>>>>>>>>> SourceReader.
>>>>>>>>>>>>>> > > > >>>>>>> 2. The enumerator will always use the
>>>>>>>>>>>>>> > > > >>>>>>> SourceEnumeratorContext.assignSplits() to
>>>>>>>>>>>>>> assign the splits.
>>>>>>>>>>>>>> > (not
>>>>>>>>>>>>>> > > > via the
>>>>>>>>>>>>>> > > > >>>>>>> response of the SourceRegistrationEvent, this
>>>>>>>>>>>>>> allows async
>>>>>>>>>>>>>> > split
>>>>>>>>>>>>>> > > > assignment
>>>>>>>>>>>>>> > > > >>>>>>> in case the enumerator wants to wait until all
>>>>>>>>>>>>>> the readers are
>>>>>>>>>>>>>> > > > registered)
>>>>>>>>>>>>>> > > > >>>>>>> 3. The SourceOperator will only call
>>>>>>>>>>>>>> SourceReader.addSplits()
>>>>>>>>>>>>>> > when
>>>>>>>>>>>>>> > > > >>>>>>> it receives the AddSplitEvent from the
>>>>>>>>>>>>>> enumerator.
>>>>>>>>>>>>>> > > > >>>>>>>
>>>>>>>>>>>>>> > > > >>>>>>> This protocol has a few benefits:
>>>>>>>>>>>>>> > > > >>>>>>> 1. it basically allows arbitrary split
>>>>>>>>>>>>>> reassignment upon
>>>>>>>>>>>>>> > restart
>>>>>>>>>>>>>> > > > >>>>>>> 2. simplicity: there is only one way to assign
>>>>>>>>>>>>>> splits.
>>>>>>>>>>>>>> > > > >>>>>>>
>>>>>>>>>>>>>> > > > >>>>>>> So we only need one interface change:
>>>>>>>>>>>>>> > > > >>>>>>> - add the initially assigned splits to
>>>>>>>>>>>>>> ReaderInfo so the
>>>>>>>>>>>>>> > Enumerator
>>>>>>>>>>>>>> > > > >>>>>>> can access it.
>>>>>>>>>>>>>> > > > >>>>>>> and one behavior change:
>>>>>>>>>>>>>> > > > >>>>>>> - The SourceOperator should stop assigning
>>>>>>>>>>>>>> splits to the from
>>>>>>>>>>>>>> > state
>>>>>>>>>>>>>> > > > >>>>>>> restoration, but
>>>>>>>>>>>>>> > [message truncated...]
>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>

Reply via email to