Hi Becket,
Thanks for your explanation.

> For the same three input above, the assignment should be consistently the
same.

That is exactly what troubles me. For *assignment algorithms such as hash,
it does behave the same. What If we use round-robin? Each *the *reader
information, the same split will be assigned to different readers. There is
also what I used to list as an example.*

   1. *Initial state:*: 2 parallelism, 2 splits.
   2. *Enumerator action:*  Split 1 → Task 1, Split 2 → Task 2 ,  ,
   3. *Failure scenario: *After Split 2 is assigned to Task 2 but before
   next checkpoint success, task 1 restarts.
   4. *Recovery issue:* Split 2 is re-added to the enumerator. Round-robin
   strategy assigns Split 2 to Task 1. Then Task 1 now has 2 splits, Task 2
   has 0 → Imbalanced distribution.


> Please let me know if you think a meeting would be more efficient.
Yes, I’d like to reach an agreement as soon as possible. If you’re
available, we could schedule a meeting with Lenenord as well.

Best,
Hongshun

On Sat, Oct 11, 2025 at 3:59 PM Becket Qin <[email protected]> wrote:

> Hi Hongshun,
>
> I am confused. First of all, regardless of what the assignment algorithm
> is. Using SplitEnumeratorContext to return the splits only gives more
> information than using addSplitsBack(). So there should be no regression.
>
> Secondly, at this point. The SplitEnumerator should only take the
> following three input to generate the global splits assignment:
> 1. the *reader information (num readers, locations, etc)*
> 2. *all the splits to assign*
> 3. *configured assignment algorithm *
> Preferably, for the same three input above, the assignment should be
> consistently the same. I don't see why it should care about why a new
> reader is added, whether due to partial failover or global failover or job
> restart.
>
> If you want to do global redistribution on global failover and restart,
> but honor the existing assignment for partial failover. The enumerator will
> just do the following:
> 1. Generate a new global assignment (global redistribution) in start()
> because start() will only be invoked in global failover or restart. That
> means all the readers are also new with empty assignment.
> 2. After the global assignment is generated, it should be honored for the
> whole life cycle. there might be many reader registrations, again for
> different reasons but does not matter:
>     - reader registration after this job restart
>     - reader registration after this global failover
>     - reader registration due to partial failover which may or may not
> have a addSplitsBack() call.
>     Regardless of the reason, the split enumerator will just enforce the
> global assignment it has already generated, i.e. without split
> redistribution.
>
> Wouldn't that give the behavior you want? I feel the discussion somehow
> goes to circles. Please let me know if you think a meeting would be more
> efficient.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Oct 10, 2025 at 7:58 PM Hongshun Wang <[email protected]>
> wrote:
>
>> Hi Becket,
>>
>> > Ignore a returned split if it has been assigned to a different reader,
>> otherwise put it back to unassigned splits / pending splits. Then the
>> enumerator assigns new splits to the newly added reader, which may use the
>> previous assignment as a reference. This should work regardless of whether
>> it is a global failover, partial failover, restart, etc. There is no need
>> for the SplitEnumerator to distinguish what failover scenario it is.
>>
>> In this case, it seems that global failover and partial failover share
>> the same distribution strategy If it has not been assigned to a different
>> reader. However, global failover needs to be redistributed(this is why we
>> need this FLIP) , while partial failover is not. I have no idea how we
>> distinguish them.
>>
>> What do you think?
>>
>> Best,
>> Hongshun
>>
>> On Sat, Oct 11, 2025 at 12:54 AM Becket Qin <[email protected]> wrote:
>>
>>> Hi Hongshun,
>>>
>>> The problem we are trying to solve here is to give the splits back to
>>> the SplitEnumerator. There are only two types of splits to give back:
>>> 1) splits whose assignment has been checkpointed. - In this case, we
>>> rely on addReader() + SplitEnumeratorContext to give the splits back, this
>>> provides more information associated with those splits.
>>> 2) splits whose assignment has not been checkpointed. -  In this case,
>>> we use addSplitsBack(), there is no reader info to give because the
>>> previous assignment did not take effect to begin with.
>>>
>>> From the SplitEnumerator implementation perspective, the contract is
>>> straightforward.
>>> 1. The SplitEnumerator is the source of truth for assignment.
>>> 2. When the enumerator receives the addSplits() call, it always add
>>> these splits back to unassigned splits / pending splits.
>>> 3. When the enumerator receives the addReader() call, that means the
>>> reader has no current assignment, and has returned its previous assignment
>>> based on the reader side info. The SplitEnumerator checks the
>>> SplitEnumeratorContext to retrieve the returned splits from that reader
>>> (i.e. previous assignment) and handle them according to its own source of
>>> truth knowledge of assignment - Ignore a returned split if it has been
>>> assigned to a different reader, otherwise put it back to unassigned splits
>>> / pending splits. Then the enumerator assigns new splits to the newly added
>>> reader, which may use the previous assignment as a reference. This should
>>> work regardless of whether it is a global failover, partial failover,
>>> restart, etc. There is no need for the SplitEnumerator to distinguish what
>>> failover scenario it is.
>>>
>>> Would this work?
>>>
>>> Thanks,
>>>
>>> Jiangjie (Becket) Qin
>>>
>>> On Fri, Oct 10, 2025 at 1:28 AM Hongshun Wang <[email protected]>
>>> wrote:
>>>
>>>> Hi Becket,
>>>>  > why do we need to change the behavior of addSplitsBack()? Should it
>>>> remain the same?
>>>>
>>>> How does the enumerator get the splits from ReaderRegistrationEvent and
>>>> then reassign it?
>>>>
>>>> You have given a advice before:
>>>> > 1. Put all the reader information in the SplitEnumerator context.  2.
>>>> notify the enumerator about the new reader registration. 3. let the split
>>>> enumerator get whatever information it wants from the context and do its
>>>> job.
>>>>
>>>> However, each time a source task fails over, the ConcurrentMap<Integer,
>>>> ConcurrentMap<Integer, ReaderInfo>> registeredReaders will remove this
>>>> reader infos. When the source task is registered again, it will be added
>>>> again. *Thus, registeredReaders cannot know whether is registered
>>>> before. *
>>>>
>>>> Therefore, registeredReaders enumerator#addReader does not distinguish
>>>> the following situations:
>>>> However, each time one source task is failover. The
>>>> `ConcurrentMap<Integer, ConcurrentMap<Integer, ReaderInfo>>
>>>> registeredReaders` will remove this source. When source Task is registered
>>>> again, enumerator#addReader not distinguished three situations:
>>>> 1. The Reader is registered when the global restart. In this case,
>>>> redistribution the split from the infos. (take off all the splits from
>>>> ReaderInfo).
>>>> 2. The Reader is registered when a partial failover(before the first
>>>> successful checkpoint). In this case,  ignore the split from the infos.
>>>> (leave alone all the splits from ReaderInfo).
>>>> 3. The Reader is registered when a partial failover(after the first
>>>> successful checkpoint).In this case, we need assign the split to same
>>>> reader again. (take off all the splits from ReaderInfo but assigned to it
>>>> again).
>>>> we still need the enumerator to distinguish them (using
>>>> pendingSplitAssignment & assignedSplitAssignment. However, it is redundant
>>>> to maintain split assigned information both in the enumerator and the
>>>> enumerator context.
>>>>
>>>> I think if we change the behavior of addSplitsBack, it will be more
>>>> simple. Just let the enumerator to handle these split based on 
>>>> pendingSplitAssignment
>>>> & assignedSplitments.
>>>>
>>>> What do you think?
>>>>
>>>> Best,
>>>> Hongshun
>>>>
>>>> On Fri, Oct 10, 2025 at 12:55 PM Becket Qin <[email protected]>
>>>> wrote:
>>>>
>>>>> Hi Hongshun,
>>>>>
>>>>> Thanks for updating the FLIP. A quick question: why do we need to
>>>>> change the behavior of addSplitsBack()? Should it remain the same?
>>>>>
>>>>> Regarding the case of restart with changed subscription. I think the
>>>>> only correct behavior is removing obsolete splits without any warning /
>>>>> exception. It is OK to add an info level logging if we want to. It is a
>>>>> clear intention if the user has explicitly changed subscription and
>>>>> restarted the job. There is no need to add a config to double confirm.
>>>>>
>>>>> Regards,
>>>>>
>>>>> Jiangjie (Becket) Qin
>>>>>
>>>>> On Thu, Oct 9, 2025 at 7:28 PM Hongshun Wang <[email protected]>
>>>>> wrote:
>>>>>
>>>>>> Hi Leonard,
>>>>>>
>>>>>> If the SplitEnumerator received all splits after a restart, it
>>>>>> becomes straightforward to clear and un-assign the unmatched
>>>>>> splits(checking whether matches the source options). However, a key
>>>>>> question arises: *should  automatically discard obsolete splits, or
>>>>>> explicitly notify the user via an exception?*
>>>>>>
>>>>>> We provided a option `scan.partition-unsubscribe.strategy`:
>>>>>> 1. If Strict, throws an exception when encountering removed splits.
>>>>>> 2. If Lenient, automatically removes obsolete splits silently.
>>>>>>
>>>>>> What Do you think?
>>>>>>
>>>>>> Best,
>>>>>> Hongshun
>>>>>>
>>>>>> On Thu, Oct 9, 2025 at 9:37 PM Leonard Xu <[email protected]> wrote:
>>>>>>
>>>>>>> Thanks hongshun for the updating and pretty detailed analysis for
>>>>>>> edge cases,  the updated FLIP looks good to me now.
>>>>>>>
>>>>>>> Only last implementation details about scenario in motivation
>>>>>>> section:
>>>>>>>
>>>>>>> *Restart with Changed subscription: During restart, if source
>>>>>>> options remove a topic or table. The splits which have already assigned 
>>>>>>> can
>>>>>>> not be removed.*
>>>>>>>
>>>>>>> Could you clarify how we resolve this in Kafka connector ?
>>>>>>>
>>>>>>> Best,
>>>>>>> Leonard
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> 2025 10月 9 19:48,Hongshun Wang <[email protected]> 写道:
>>>>>>>
>>>>>>> Hi devs,
>>>>>>> If there are no further suggestions, I will start the voting
>>>>>>> tomorrow。
>>>>>>>
>>>>>>> Best,
>>>>>>> Hongshun
>>>>>>>
>>>>>>> On Fri, Sep 26, 2025 at 7:48 PM Hongshun Wang <
>>>>>>> [email protected]> wrote:
>>>>>>>
>>>>>>>> Hi Becket and Leonard,
>>>>>>>>
>>>>>>>> I have updated the content of this FLIP. The key point is that:
>>>>>>>>
>>>>>>>> When the split enumerator receives a split, *these splits must
>>>>>>>> have already existed in pendingSplitAssignment or assignedSplitments*
>>>>>>>> .
>>>>>>>>
>>>>>>>>    - If the split is in pendingSplitAssignments, ignore it.
>>>>>>>>    - If the split is in assignedSplitAssignments but has a
>>>>>>>>    different taskId, ignore it (this indicates it was already
>>>>>>>>    assigned to another task).
>>>>>>>>    - If the split is in assignedSplitAssignments and shares the
>>>>>>>>    same taskId, move the assignment from assignedSplitments to 
>>>>>>>> pendingSplitAssignment
>>>>>>>>    to re-assign again.
>>>>>>>>
>>>>>>>>
>>>>>>>> For better understanding why use these strategies. I added some
>>>>>>>> examples and pictures to show it.
>>>>>>>>
>>>>>>>> Would you like to help me check whether there are still some
>>>>>>>> problems?
>>>>>>>>
>>>>>>>> Best
>>>>>>>> Hongshun
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Sep 26, 2025 at 5:08 PM Leonard Xu <[email protected]>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks Becket and Hongshun for the insightful discussion.
>>>>>>>>>
>>>>>>>>> The underlying implementation and communication mechanisms of
>>>>>>>>> Flink Source indeed involve many intricate details, we discussed the 
>>>>>>>>> issue
>>>>>>>>> of splits re-assignment in specific scenarios, but fortunately, the 
>>>>>>>>> final
>>>>>>>>> decision turned out to be pretty clear.
>>>>>>>>>
>>>>>>>>>  +1 to Becket’s proposal to keeps the framework cleaner and more
>>>>>>>>> flexible.
>>>>>>>>> +1 to Hongshun’s point to provide comprehensive guidance for
>>>>>>>>> connector developers.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Leonard
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2025 9月 26 16:30,Hongshun Wang <[email protected]> 写道:
>>>>>>>>>
>>>>>>>>> Hi Becket,
>>>>>>>>>
>>>>>>>>> I Got it. You’re suggesting we should not handle this in the
>>>>>>>>> source framework but instead let the split enumerator manage these 
>>>>>>>>> three
>>>>>>>>> scenarios.
>>>>>>>>>
>>>>>>>>> Let me explain why I originally favored handling it in the
>>>>>>>>> framework: I'm concerned that connector developers might overlook 
>>>>>>>>> certain
>>>>>>>>> edge cases (after all, we even payed extensive discussions to fully 
>>>>>>>>> clarify
>>>>>>>>> the logic)
>>>>>>>>>
>>>>>>>>> However, your point keeps the framework cleaner and more flexible.
>>>>>>>>> Thus, I will take it.
>>>>>>>>>
>>>>>>>>> Perhaps, in this FLIP, we should focus on providing comprehensive
>>>>>>>>> guidance for connector developers: explain how to implement a
>>>>>>>>> split enumerator, including the underlying challenges and their 
>>>>>>>>> solutions.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Additionally, we can use the Kafka connector as a reference
>>>>>>>>> implementation to demonstrate the practical steps. This way, 
>>>>>>>>> developers who
>>>>>>>>> want to implement similar connectors can directly reference this 
>>>>>>>>> example.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Hongshun
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Sep 26, 2025 at 1:27 PM Becket Qin <[email protected]>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> It would be good to not expose runtime details to the source
>>>>>>>>>> implementation if possible.
>>>>>>>>>>
>>>>>>>>>> Today, the split enumerator implementations are expected to track
>>>>>>>>>> the split assignment.
>>>>>>>>>>
>>>>>>>>>> Assuming the split enumerator implementation keeps a split
>>>>>>>>>> assignment map, that means the enumerator should already know 
>>>>>>>>>> whether a
>>>>>>>>>> split is assigned or unassigned. So it can handle the three 
>>>>>>>>>> scenarios you
>>>>>>>>>> mentioned.
>>>>>>>>>>
>>>>>>>>>> The split is reported by a reader during a global restoration.
>>>>>>>>>>>
>>>>>>>>>> The split enumerator should have just been restored / created. If
>>>>>>>>>> the enumerator expects a full reassignment of splits up on global 
>>>>>>>>>> recovery,
>>>>>>>>>> there should be no assigned splits to that reader in the split 
>>>>>>>>>> assignment
>>>>>>>>>> mapping.
>>>>>>>>>>
>>>>>>>>>> The split is reported by a reader during a partial failure
>>>>>>>>>>> recovery.
>>>>>>>>>>>
>>>>>>>>>> In this case, when SplitEnumerator.addReader() is invoked, the
>>>>>>>>>> split assignment map in the enumerator implementation should already 
>>>>>>>>>> have
>>>>>>>>>> some split assignments for the reader. Therefore it is a partial 
>>>>>>>>>> failover.
>>>>>>>>>> If the source supports split reassignment on recovery, the 
>>>>>>>>>> enumerator can
>>>>>>>>>> assign splits that are different from the reported assignment of that
>>>>>>>>>> reader in the SplitEnumeratorContext, or it can also assign the same
>>>>>>>>>> splits. In any case, the enumerator knows that this is a partial 
>>>>>>>>>> recovery
>>>>>>>>>> because the assignment map is non-empty.
>>>>>>>>>>
>>>>>>>>>> The split is not reported by a reader, but is assigned after the
>>>>>>>>>>> last successful checkpoint and was never acknowledged.
>>>>>>>>>>
>>>>>>>>>> This is actually one of the step in the partial failure recover.
>>>>>>>>>> SplitEnumerator.addSplitsBack() will be called first before
>>>>>>>>>> SplitReader.addReader() is called for the recovered reader. When the
>>>>>>>>>> SplitEnumerator.addSplitsBack() is invoked, it is for sure a partial
>>>>>>>>>> recovery. And the enumerator should remove these splits from the 
>>>>>>>>>> split
>>>>>>>>>> assignment map as if they were never assigned.
>>>>>>>>>>
>>>>>>>>>> I think this should work, right?
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>>
>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>
>>>>>>>>>> On Thu, Sep 25, 2025 at 8:34 PM Hongshun Wang <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Becket and Leonard,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for your advice.
>>>>>>>>>>>
>>>>>>>>>>> > put all the reader information in the SplitEnumerator context
>>>>>>>>>>> I have a concern: the current registeredReaders in*
>>>>>>>>>>> SourceCoordinatorContext will be removed after subtaskResetor 
>>>>>>>>>>> execution on
>>>>>>>>>>> failure*.However, this approach has merit.
>>>>>>>>>>>
>>>>>>>>>>> One more situation I found my previous design does not cover:
>>>>>>>>>>>
>>>>>>>>>>>    1. Initial state: Reader A reports splits (1, 2).
>>>>>>>>>>>    2. Enumerator action: Assigns split 1 to Reader A, and split
>>>>>>>>>>>    2 to Reader B.
>>>>>>>>>>>    3. Failure scenario: Reader A fails before checkpointing.
>>>>>>>>>>>    Since this is a partial failure, only Reader A restarts.
>>>>>>>>>>>    4. Recovery issue: Upon recovery, Reader A re-reports split
>>>>>>>>>>>    (1).
>>>>>>>>>>>
>>>>>>>>>>> In my previous design, the enumerator will ignore Reader A's
>>>>>>>>>>> re-registration which will cause data loss.
>>>>>>>>>>>
>>>>>>>>>>> Thus, when the enumerator receives a split, the split may
>>>>>>>>>>> originate from three scenarios:
>>>>>>>>>>>
>>>>>>>>>>>    1. The split is reported by a reader during a global
>>>>>>>>>>>    restoration.
>>>>>>>>>>>    2. The split is reported by a reader during a partial
>>>>>>>>>>>    failure recovery.
>>>>>>>>>>>    3. The split is not reported by a reader, but is assigned
>>>>>>>>>>>    after the last successful checkpoint and was never acknowledged.
>>>>>>>>>>>
>>>>>>>>>>> In the first scenario (global restore), the split should
>>>>>>>>>>> be re-distributed. For the latter two scenarios (partial failover 
>>>>>>>>>>> and
>>>>>>>>>>> post-checkpoint assignment), we need to reassign the split to
>>>>>>>>>>> its originally assigned subtask.
>>>>>>>>>>>
>>>>>>>>>>> By implementing a method in the SplitEnumerator context to track
>>>>>>>>>>> each assigned split's status, the system can correctly identify and 
>>>>>>>>>>> resolve
>>>>>>>>>>> split ownership in all three scenarios.*What about adding a
>>>>>>>>>>> `SplitRecoveryType splitRecoveryType(Split split)` in
>>>>>>>>>>> SplitEnumeratorContext.* SplitRecoveryTypeis a enum including
>>>>>>>>>>> `UNASSIGNED`、`GLOBAL_RESTORE`、`PARTIAL_FAILOVER` and
>>>>>>>>>>> `POST_CHECKPOINT_ASSIGNMENT`.
>>>>>>>>>>>
>>>>>>>>>>> What do you think? Are there any details or scenarios I haven't
>>>>>>>>>>> considered? Looking forward to your advice.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Hongshun
>>>>>>>>>>>
>>>>>>>>>>> On Thu, Sep 11, 2025 at 12:41 AM Becket Qin <
>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks for the explanation, Hongshun.
>>>>>>>>>>>>
>>>>>>>>>>>> Current pattern of handling new reader registration following:
>>>>>>>>>>>> 1. put all the reader information in the SplitEnumerator context
>>>>>>>>>>>> 2. notify the enumerator about the new reader registration.
>>>>>>>>>>>> 3. Let the split enumerator get whatever information it wants
>>>>>>>>>>>> from the
>>>>>>>>>>>> context and do its job.
>>>>>>>>>>>> This pattern decouples the information passing and the reader
>>>>>>>>>>>> registration
>>>>>>>>>>>> notification. This makes the API extensible - we can add more
>>>>>>>>>>>> information
>>>>>>>>>>>> (e.g. reported assigned splits in our case) about the reader to
>>>>>>>>>>>> the context
>>>>>>>>>>>> without introducing new methods.
>>>>>>>>>>>>
>>>>>>>>>>>> Introducing a new method of addSplitBackOnRecovery() is
>>>>>>>>>>>> redundant to the
>>>>>>>>>>>> above pattern. Do we really need it?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>
>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Sep 8, 2025 at 8:18 PM Hongshun Wang <
>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> > Hi Becket,
>>>>>>>>>>>> >
>>>>>>>>>>>> > > I am curious what would the enumerator do differently for
>>>>>>>>>>>> the splits
>>>>>>>>>>>> > added via addSplitsBackOnRecovery() V.S. addSplitsBack()?
>>>>>>>>>>>> >
>>>>>>>>>>>> >  In this FLIP, there are two distinct scenarios in which the
>>>>>>>>>>>> enumerator
>>>>>>>>>>>> > receives splits being added back:
>>>>>>>>>>>> > 1.  Job-level restore: The job is restored,  splits from
>>>>>>>>>>>> reader’s state are
>>>>>>>>>>>> > reported by ReaderRegistrationEvent.
>>>>>>>>>>>> > 2.  Reader-level restart: a reader is started but not the
>>>>>>>>>>>> whole  job,
>>>>>>>>>>>> >  splits assigned to it after the last successful checkpoint.
>>>>>>>>>>>> This is what
>>>>>>>>>>>> > addSplitsBack used to do.
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> > In these two situations, the enumerator will choose different
>>>>>>>>>>>> strategies.
>>>>>>>>>>>> > 1. Job-level restore: the splits should be redistributed
>>>>>>>>>>>> across readers
>>>>>>>>>>>> > according to the current partitioner strategy.
>>>>>>>>>>>> > 2. Reader-level restart: the splits should be reassigned
>>>>>>>>>>>> directly back to
>>>>>>>>>>>> > the same reader they were originally assigned to, preserving
>>>>>>>>>>>> locality and
>>>>>>>>>>>> > avoiding unnecessary redistribution
>>>>>>>>>>>> >
>>>>>>>>>>>> > Therefore, the enumerator must clearly distinguish between
>>>>>>>>>>>> these two
>>>>>>>>>>>> > scenarios.I used to deprecate the former
>>>>>>>>>>>> addSplitsBack(List<SplitT>
>>>>>>>>>>>> > splits, int subtaskId) but add a new
>>>>>>>>>>>> addSplitsBack(List<SplitT>
>>>>>>>>>>>> > splits, int subtaskId,
>>>>>>>>>>>> > boolean reportedByReader).
>>>>>>>>>>>> > Leonard suggest to use another method addSplitsBackOnRecovery
>>>>>>>>>>>> but not
>>>>>>>>>>>> > influenced  currently addSplitsBack.
>>>>>>>>>>>> >
>>>>>>>>>>>> > Best
>>>>>>>>>>>> > Hongshun
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> > On 2025/09/08 17:20:31 Becket Qin wrote:
>>>>>>>>>>>> > > Hi Leonard,
>>>>>>>>>>>> > >
>>>>>>>>>>>> > >
>>>>>>>>>>>> > > > Could we introduce a new method like
>>>>>>>>>>>> addSplitsBackOnRecovery  with
>>>>>>>>>>>> > default
>>>>>>>>>>>> > > > implementation. In this way, we can provide better
>>>>>>>>>>>> backward
>>>>>>>>>>>> > compatibility
>>>>>>>>>>>> > > > and also makes it easier for developers to understand.
>>>>>>>>>>>> > >
>>>>>>>>>>>> > >
>>>>>>>>>>>> > > I am curious what would the enumerator do differently for
>>>>>>>>>>>> the splits
>>>>>>>>>>>> > added
>>>>>>>>>>>> > > via addSplitsBackOnRecovery() V.S. addSplitsBack()?  Today,
>>>>>>>>>>>> > addSplitsBack()
>>>>>>>>>>>> > > is also only called upon recovery. So the new method seems
>>>>>>>>>>>> confusing. One
>>>>>>>>>>>> > > thing worth clarifying is if the Source implements
>>>>>>>>>>>> > > SupportSplitReassignmentOnRecovery, upon recovery, should
>>>>>>>>>>>> the splits
>>>>>>>>>>>> > > reported by the readers also be added back to the
>>>>>>>>>>>> SplitEnumerator via the
>>>>>>>>>>>> > > addSplitsBack() call? Or should the SplitEnumerator
>>>>>>>>>>>> explicitly query the
>>>>>>>>>>>> > > registered reader information via the
>>>>>>>>>>>> SplitEnumeratorContext to get the
>>>>>>>>>>>> > > originally assigned splits when addReader() is invoked? I
>>>>>>>>>>>> was assuming
>>>>>>>>>>>> > the
>>>>>>>>>>>> > > latter in the beginning, so the behavior of addSplitsBack()
>>>>>>>>>>>> remains
>>>>>>>>>>>> > > unchanged, but I am not opposed in doing the former.
>>>>>>>>>>>> > >
>>>>>>>>>>>> > > Also, can you elaborate on the backwards compatibility
>>>>>>>>>>>> issue you see if
>>>>>>>>>>>> > we
>>>>>>>>>>>> > > do not have a separate addSplitsBackOnRecovery() method?
>>>>>>>>>>>> Even without
>>>>>>>>>>>> > this
>>>>>>>>>>>> > > new method, the behavior remains exactly the same unless
>>>>>>>>>>>> the end users
>>>>>>>>>>>> > > implement the mix-in interface of
>>>>>>>>>>>> "SupportSplitReassignmentOnRecovery",
>>>>>>>>>>>> > > right?
>>>>>>>>>>>> > >
>>>>>>>>>>>> > > Thanks,
>>>>>>>>>>>> > >
>>>>>>>>>>>> > > Jiangjie (Becket) Qin
>>>>>>>>>>>> > >
>>>>>>>>>>>> > > On Mon, Sep 8, 2025 at 1:48 AM Hongshun Wang <
>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>> > > wrote:
>>>>>>>>>>>> > >
>>>>>>>>>>>> > > > Hi devs,
>>>>>>>>>>>> > > >
>>>>>>>>>>>> > > > It has been quite some time since this FLIP[1] was first
>>>>>>>>>>>> proposed.
>>>>>>>>>>>> > Thank
>>>>>>>>>>>> > > > you for your valuable feedback—based on your suggestions,
>>>>>>>>>>>> the FLIP has
>>>>>>>>>>>> > > > undergone several rounds of revisions.
>>>>>>>>>>>> > > >
>>>>>>>>>>>> > > > Any more advice is welcome and appreciated. If there are
>>>>>>>>>>>> no further
>>>>>>>>>>>> > > > concerns, I plan to start the vote tomorrow.
>>>>>>>>>>>> > > >
>>>>>>>>>>>> > > > Best
>>>>>>>>>>>> > > > Hongshun
>>>>>>>>>>>> > > >
>>>>>>>>>>>> > > > [1]
>>>>>>>>>>>> > > >
>>>>>>>>>>>> >
>>>>>>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480
>>>>>>>>>>>> > > >
>>>>>>>>>>>> > > > On Mon, Sep 8, 2025 at 4:42 PM Hongshun Wang <
>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>> > > > wrote:
>>>>>>>>>>>> > > >
>>>>>>>>>>>> > > > > Hi Leonard,
>>>>>>>>>>>> > > > > Thanks for your advice.  It makes sense and I have
>>>>>>>>>>>> modified it.
>>>>>>>>>>>> > > > >
>>>>>>>>>>>> > > > > Best,
>>>>>>>>>>>> > > > > Hongshun
>>>>>>>>>>>> > > > >
>>>>>>>>>>>> > > > > On Mon, Sep 8, 2025 at 11:40 AM Leonard Xu <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>> > > > >
>>>>>>>>>>>> > > > >> Thanks Hongshun and Becket for the deep discussion.
>>>>>>>>>>>> > > > >>
>>>>>>>>>>>> > > > >> I only have one comment for one API design:
>>>>>>>>>>>> > > > >>
>>>>>>>>>>>> > > > >> > Deprecate the old addSplitsBack  method, use a
>>>>>>>>>>>> addSplitsBack with
>>>>>>>>>>>> > > > >> param isReportedByReader instead. Because, The
>>>>>>>>>>>> enumerator can apply
>>>>>>>>>>>> > > > >> different reassignment policies based on the context.
>>>>>>>>>>>> > > > >>
>>>>>>>>>>>> > > > >> Could we introduce a new method like
>>>>>>>>>>>> *addSplitsBackOnRecovery*  with
>>>>>>>>>>>> > > > default
>>>>>>>>>>>> > > > >> implementation. In this way, we can provide better
>>>>>>>>>>>> backward
>>>>>>>>>>>> > > > >> compatibility and also makes it easier for developers
>>>>>>>>>>>> to understand.
>>>>>>>>>>>> > > > >>
>>>>>>>>>>>> > > > >> Best,
>>>>>>>>>>>> > > > >> Leonard
>>>>>>>>>>>> > > > >>
>>>>>>>>>>>> > > > >>
>>>>>>>>>>>> > > > >>
>>>>>>>>>>>> > > > >> 2025 9月 3 20:26,Hongshun Wang <[email protected]> 写道:
>>>>>>>>>>>> > > > >>
>>>>>>>>>>>> > > > >> Hi Becket,
>>>>>>>>>>>> > > > >>
>>>>>>>>>>>> > > > >> I think that's a great idea!  I have added the
>>>>>>>>>>>> > > > >> SupportSplitReassignmentOnRecovery interface in this
>>>>>>>>>>>> FLIP. If a
>>>>>>>>>>>> > Source
>>>>>>>>>>>> > > > >> implements this interface indicates that the source
>>>>>>>>>>>> operator needs
>>>>>>>>>>>> > to
>>>>>>>>>>>> > > > >> report splits to the enumerator and receive
>>>>>>>>>>>> reassignment.[1]
>>>>>>>>>>>> > > > >>
>>>>>>>>>>>> > > > >> Best,
>>>>>>>>>>>> > > > >> Hongshun
>>>>>>>>>>>> > > > >>
>>>>>>>>>>>> > > > >> [1]
>>>>>>>>>>>> > > > >>
>>>>>>>>>>>> > > >
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>>>>>>>>>> > > > >>
>>>>>>>>>>>> > > > >> On Thu, Aug 21, 2025 at 12:09 PM Becket Qin <
>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>> > > > wrote:
>>>>>>>>>>>> > > > >>
>>>>>>>>>>>> > > > >>> Hi Hongshun,
>>>>>>>>>>>> > > > >>>
>>>>>>>>>>>> > > > >>> I think the convention for such optional features in
>>>>>>>>>>>> Source is via
>>>>>>>>>>>> > > > >>> mix-in interfaces. So instead of adding a method to
>>>>>>>>>>>> the
>>>>>>>>>>>> > SourceReader,
>>>>>>>>>>>> > > > maybe
>>>>>>>>>>>> > > > >>> we should introduce an interface
>>>>>>>>>>>> SupportSplitReassingmentOnRecovery
>>>>>>>>>>>> > > > with
>>>>>>>>>>>> > > > >>> this method. If a Source implementation implements
>>>>>>>>>>>> that interface,
>>>>>>>>>>>> > > > then the
>>>>>>>>>>>> > > > >>> SourceOperator will check the desired behavior and
>>>>>>>>>>>> act accordingly.
>>>>>>>>>>>> > > > >>>
>>>>>>>>>>>> > > > >>> Thanks,
>>>>>>>>>>>> > > > >>>
>>>>>>>>>>>> > > > >>> Jiangjie (Becket) Qin
>>>>>>>>>>>> > > > >>>
>>>>>>>>>>>> > > > >>> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang <
>>>>>>>>>>>> > [email protected]
>>>>>>>>>>>> > > > >
>>>>>>>>>>>> > > > >>> wrote:
>>>>>>>>>>>> > > > >>>
>>>>>>>>>>>> > > > >>>> Hi de vs,
>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>> > > > >>>> Would anyone like to discuss this FLIP? I'd
>>>>>>>>>>>> appreciate your
>>>>>>>>>>>> > feedback
>>>>>>>>>>>> > > > >>>> and suggestions.
>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>> > > > >>>> Best,
>>>>>>>>>>>> > > > >>>> Hongshun
>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>> > > > >>>> 2025年8月13日 14:23,Hongshun Wang <[email protected]> 写道:
>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>> > > > >>>> Hi Becket,
>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>> > > > >>>> Thank you for your detailed feedback. The new
>>>>>>>>>>>> contract makes good
>>>>>>>>>>>> > > > sense
>>>>>>>>>>>> > > > >>>> to me and effectively addresses the issues I
>>>>>>>>>>>> encountered at the
>>>>>>>>>>>> > > > beginning
>>>>>>>>>>>> > > > >>>> of the design.
>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>> > > > >>>> That said, I recommend not reporting splits by
>>>>>>>>>>>> default, primarily
>>>>>>>>>>>> > for
>>>>>>>>>>>> > > > >>>> compatibility and practical reasons:
>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>> > > > >>>> >  For these reasons, we do not expect the Split
>>>>>>>>>>>> objects to be
>>>>>>>>>>>> > huge,
>>>>>>>>>>>> > > > >>>> and we are not trying to design for huge Split
>>>>>>>>>>>> objects either as
>>>>>>>>>>>> > they
>>>>>>>>>>>> > > > will
>>>>>>>>>>>> > > > >>>> have problems even today.
>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>> > > > >>>>    1.
>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>> > > > >>>>    Not all existing connector match this rule
>>>>>>>>>>>> > > > >>>>    For example, in mysql cdc connector, a binlog
>>>>>>>>>>>> split may contain
>>>>>>>>>>>> > > > >>>>    hundreds (or even more) snapshot split completion
>>>>>>>>>>>> records. This
>>>>>>>>>>>> > > > state is
>>>>>>>>>>>> > > > >>>>    large and is currently transmitted incrementally
>>>>>>>>>>>> through
>>>>>>>>>>>> > multiple
>>>>>>>>>>>> > > > >>>>    BinlogSplitMetaEvent messages. Since the binlog
>>>>>>>>>>>> reader operates
>>>>>>>>>>>> > > > >>>>    with single parallelism, reporting the full split
>>>>>>>>>>>> state on
>>>>>>>>>>>> > recovery
>>>>>>>>>>>> > > > >>>>    could be inefficient or even infeasible.
>>>>>>>>>>>> > > > >>>>    For such sources, it would be better to provide a
>>>>>>>>>>>> mechanism to
>>>>>>>>>>>> > skip
>>>>>>>>>>>> > > > >>>>    split reporting during restart until they
>>>>>>>>>>>> redesign and reduce
>>>>>>>>>>>> > the
>>>>>>>>>>>> > > > >>>>    split size.
>>>>>>>>>>>> > > > >>>>    2.
>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>> > > > >>>>    Not all enumerators maintain unassigned splits in
>>>>>>>>>>>> state.
>>>>>>>>>>>> > > > >>>>    Some SplitEnumerator(such as kafka connector)
>>>>>>>>>>>> implementations
>>>>>>>>>>>> > do
>>>>>>>>>>>> > > > >>>>    not track or persistently manage unassigned
>>>>>>>>>>>> splits. Requiring
>>>>>>>>>>>> > them
>>>>>>>>>>>> > > > to
>>>>>>>>>>>> > > > >>>>    handle re-registration would add unnecessary
>>>>>>>>>>>> complexity. Even
>>>>>>>>>>>> > > > though we
>>>>>>>>>>>> > > > >>>>    maybe implements in kafka connector, currently,
>>>>>>>>>>>> kafka connector
>>>>>>>>>>>> > is
>>>>>>>>>>>> > > > decouple
>>>>>>>>>>>> > > > >>>>    with flink version, we also need to make sure the
>>>>>>>>>>>> elder version
>>>>>>>>>>>> > is
>>>>>>>>>>>> > > > >>>>    compatible.
>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>> > > > >>>> ------------------------------
>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>> > > > >>>> To address these concerns, I propose introducing a
>>>>>>>>>>>> new method:
>>>>>>>>>>>> > boolean
>>>>>>>>>>>> > > > >>>> SourceReader#shouldReassignSplitsOnRecovery() with a
>>>>>>>>>>>> default
>>>>>>>>>>>> > > > >>>> implementation returning false. This allows source
>>>>>>>>>>>> readers to opt
>>>>>>>>>>>> > in
>>>>>>>>>>>> > > > >>>> to split reassignment only when necessary. Since the
>>>>>>>>>>>> new contract
>>>>>>>>>>>> > > > already
>>>>>>>>>>>> > > > >>>> places the responsibility for split assignment on
>>>>>>>>>>>> the enumerator,
>>>>>>>>>>>> > not
>>>>>>>>>>>> > > > >>>> reporting splits by default is a safe and clean
>>>>>>>>>>>> default behavior.
>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>> > > > >>>> ------------------------------
>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>> > > > >>>> I’ve updated the implementation and the FIP
>>>>>>>>>>>> accordingly[1]. It
>>>>>>>>>>>> > quite a
>>>>>>>>>>>> > > > >>>> big change. In particular, for the Kafka connector,
>>>>>>>>>>>> we can now use
>>>>>>>>>>>> > a
>>>>>>>>>>>> > > > >>>> pluggable SplitPartitioner to support different
>>>>>>>>>>>> split assignment
>>>>>>>>>>>> > > > >>>> strategies (e.g., default, round-robin).
>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>> > > > >>>> Could you please review it when you have a chance?
>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>> > > > >>>> Best,
>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>> > > > >>>> Hongshun
>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>> > > > >>>> [1]
>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>> > > >
>>>>>>>>>>>> >
>>>>>>>>>>>> >
>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>> > > > >>>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <
>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>> > > > wrote:
>>>>>>>>>>>> > > > >>>>
>>>>>>>>>>>> > > > >>>>> Hi Hongshun,
>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>> > > > >>>>> I am not too concerned about the transmission cost.
>>>>>>>>>>>> Because the
>>>>>>>>>>>> > full
>>>>>>>>>>>> > > > >>>>> split transmission has to happen in the initial
>>>>>>>>>>>> assignment phase
>>>>>>>>>>>> > > > already.
>>>>>>>>>>>> > > > >>>>> And in the future, we probably want to also
>>>>>>>>>>>> introduce some kind
>>>>>>>>>>>> > of
>>>>>>>>>>>> > > > workload
>>>>>>>>>>>> > > > >>>>> balance across source readers, e.g. based on the
>>>>>>>>>>>> per-split
>>>>>>>>>>>> > > > throughput or
>>>>>>>>>>>> > > > >>>>> the per-source-reader workload in heterogeneous
>>>>>>>>>>>> clusters. For
>>>>>>>>>>>> > these
>>>>>>>>>>>> > > > >>>>> reasons, we do not expect the Split objects to be
>>>>>>>>>>>> huge, and we
>>>>>>>>>>>> > are
>>>>>>>>>>>> > > > not
>>>>>>>>>>>> > > > >>>>> trying to design for huge Split objects either as
>>>>>>>>>>>> they will have
>>>>>>>>>>>> > > > problems
>>>>>>>>>>>> > > > >>>>> even today.
>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>> > > > >>>>> Good point on the potential split loss, please see
>>>>>>>>>>>> the reply
>>>>>>>>>>>> > below:
>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>> > > > >>>>> Scenario 2:
>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and Reader B
>>>>>>>>>>>> reports (3
>>>>>>>>>>>> > and 4)
>>>>>>>>>>>> > > > >>>>>> upon restart.
>>>>>>>>>>>> > > > >>>>>> 2. Before the enumerator receives all reports and
>>>>>>>>>>>> performs
>>>>>>>>>>>> > > > >>>>>> reassignment, a checkpoint is triggered.
>>>>>>>>>>>> > > > >>>>>> 3. Since no splits have been reassigned yet, both
>>>>>>>>>>>> readers have
>>>>>>>>>>>> > empty
>>>>>>>>>>>> > > > >>>>>> states.
>>>>>>>>>>>> > > > >>>>>> 4. When restarting from this checkpoint, all four
>>>>>>>>>>>> splits are
>>>>>>>>>>>> > lost.
>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>> > > > >>>>> The reader registration happens in the
>>>>>>>>>>>> SourceOperator.open(),
>>>>>>>>>>>> > which
>>>>>>>>>>>> > > > >>>>> means the task is still in the initializing state,
>>>>>>>>>>>> therefore the
>>>>>>>>>>>> > > > checkpoint
>>>>>>>>>>>> > > > >>>>> should not be triggered until the enumerator
>>>>>>>>>>>> receives all the
>>>>>>>>>>>> > split
>>>>>>>>>>>> > > > reports.
>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>> > > > >>>>> There is a nuance here. Today, the RPC call from
>>>>>>>>>>>> the TM to the JM
>>>>>>>>>>>> > is
>>>>>>>>>>>> > > > >>>>> async. So it is possible that the
>>>>>>>>>>>> SourceOpertor.open() has
>>>>>>>>>>>> > returned,
>>>>>>>>>>>> > > > but
>>>>>>>>>>>> > > > >>>>> the enumerator has not received the split reports.
>>>>>>>>>>>> However,
>>>>>>>>>>>> > because
>>>>>>>>>>>> > > > the
>>>>>>>>>>>> > > > >>>>> task status update RPC call goes to the same
>>>>>>>>>>>> channel as the split
>>>>>>>>>>>> > > > reports
>>>>>>>>>>>> > > > >>>>> call, so the task status RPC call will happen after
>>>>>>>>>>>> the split
>>>>>>>>>>>> > > > reports call
>>>>>>>>>>>> > > > >>>>> on the JM side. Therefore, on the JM side, the
>>>>>>>>>>>> SourceCoordinator
>>>>>>>>>>>> > will
>>>>>>>>>>>> > > > >>>>> always first receive the split reports, then
>>>>>>>>>>>> receive the
>>>>>>>>>>>> > checkpoint
>>>>>>>>>>>> > > > request.
>>>>>>>>>>>> > > > >>>>> This "happen before" relationship is kind of
>>>>>>>>>>>> important to
>>>>>>>>>>>> > guarantee
>>>>>>>>>>>> > > > >>>>> the consistent state between enumerator and readers.
>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>> > > > >>>>> Scenario 1:
>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned splits
>>>>>>>>>>>> (1 and 2), and
>>>>>>>>>>>> > > > >>>>>> Reader B reports (3 and 4).
>>>>>>>>>>>> > > > >>>>>> 2. The enumerator receives these reports but only
>>>>>>>>>>>> reassigns
>>>>>>>>>>>> > splits 1
>>>>>>>>>>>> > > > >>>>>> and 2 — not 3 and 4.
>>>>>>>>>>>> > > > >>>>>> 3. A checkpoint or savepoint is then triggered.
>>>>>>>>>>>> Only splits 1
>>>>>>>>>>>> > and 2
>>>>>>>>>>>> > > > >>>>>> are recorded in the reader states; splits 3 and 4
>>>>>>>>>>>> are not
>>>>>>>>>>>> > persisted.
>>>>>>>>>>>> > > > >>>>>> 4. If the job is later restarted from this
>>>>>>>>>>>> checkpoint, splits 3
>>>>>>>>>>>> > and
>>>>>>>>>>>> > > > 4
>>>>>>>>>>>> > > > >>>>>> will be permanently lost.
>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>> > > > >>>>> This scenario is possible. One solution is to let
>>>>>>>>>>>> the enumerator
>>>>>>>>>>>> > > > >>>>> implementation handle this. That means if the
>>>>>>>>>>>> enumerator relies
>>>>>>>>>>>> > on
>>>>>>>>>>>> > > > the
>>>>>>>>>>>> > > > >>>>> initial split reports from the source readers, it
>>>>>>>>>>>> should maintain
>>>>>>>>>>>> > > > these
>>>>>>>>>>>> > > > >>>>> reports by itself. In the above example, the
>>>>>>>>>>>> enumerator will need
>>>>>>>>>>>> > to
>>>>>>>>>>>> > > > >>>>> remember that 3 and 4 are not assigned and put it
>>>>>>>>>>>> into its own
>>>>>>>>>>>> > state.
>>>>>>>>>>>> > > > >>>>> The current contract is that anything assigned to
>>>>>>>>>>>> the
>>>>>>>>>>>> > SourceReaders
>>>>>>>>>>>> > > > >>>>> are completely owned by the SourceReaders.
>>>>>>>>>>>> Enumerators can
>>>>>>>>>>>> > remember
>>>>>>>>>>>> > > > the
>>>>>>>>>>>> > > > >>>>> assignments but cannot change them, even when the
>>>>>>>>>>>> source reader
>>>>>>>>>>>> > > > recovers /
>>>>>>>>>>>> > > > >>>>> restarts.
>>>>>>>>>>>> > > > >>>>> With this FLIP, the contract becomes that the
>>>>>>>>>>>> source readers will
>>>>>>>>>>>> > > > >>>>> return the ownership of the splits to the
>>>>>>>>>>>> enumerator. So the
>>>>>>>>>>>> > > > enumerator is
>>>>>>>>>>>> > > > >>>>> responsible for maintaining these splits, until
>>>>>>>>>>>> they are assigned
>>>>>>>>>>>> > to
>>>>>>>>>>>> > > > a
>>>>>>>>>>>> > > > >>>>> source reader again.
>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>> > > > >>>>> There are other cases where there may be conflict
>>>>>>>>>>>> information
>>>>>>>>>>>> > between
>>>>>>>>>>>> > > > >>>>> reader and enumerator. For example, consider the
>>>>>>>>>>>> following
>>>>>>>>>>>> > sequence:
>>>>>>>>>>>> > > > >>>>> 1. reader A reports splits (1 and 2) up on restart.
>>>>>>>>>>>> > > > >>>>> 2. enumerator receives the report and assigns both
>>>>>>>>>>>> 1 and 2 to
>>>>>>>>>>>> > reader
>>>>>>>>>>>> > > > B.
>>>>>>>>>>>> > > > >>>>> 3. reader A failed before checkpointing. And this
>>>>>>>>>>>> is a partial
>>>>>>>>>>>> > > > >>>>> failure, so only reader A restarts.
>>>>>>>>>>>> > > > >>>>> 4. When reader A recovers, it will again report
>>>>>>>>>>>> splits (1 and 2)
>>>>>>>>>>>> > to
>>>>>>>>>>>> > > > >>>>> the enumerator.
>>>>>>>>>>>> > > > >>>>> 5. The enumerator should ignore this report because
>>>>>>>>>>>> it has
>>>>>>>>>>>> > > > >>>>> assigned splits (1 and 2) to reader B.
>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>> > > > >>>>> So with the new contract, the enumerator should be
>>>>>>>>>>>> the source of
>>>>>>>>>>>> > > > truth
>>>>>>>>>>>> > > > >>>>> for split ownership.
>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>> > > > >>>>> Thanks,
>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>> > > > >>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>> > > > >>>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang <
>>>>>>>>>>>> > > > [email protected]>
>>>>>>>>>>>> > > > >>>>> wrote:
>>>>>>>>>>>> > > > >>>>>
>>>>>>>>>>>> > > > >>>>>> Hi Becket,
>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>> > > > >>>>>> I did consider this approach at the beginning (and
>>>>>>>>>>>> it was also
>>>>>>>>>>>> > > > >>>>>> mentioned in this FLIP), since it would allow more
>>>>>>>>>>>> flexibility
>>>>>>>>>>>> > in
>>>>>>>>>>>> > > > >>>>>> reassigning all splits. However, there are a few
>>>>>>>>>>>> potential
>>>>>>>>>>>> > issues.
>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>> > > > >>>>>> 1. High Transmission Cost
>>>>>>>>>>>> > > > >>>>>> If we pass the full split objects (rather than
>>>>>>>>>>>> just split IDs),
>>>>>>>>>>>> > the
>>>>>>>>>>>> > > > >>>>>> data size could be significant, leading to high
>>>>>>>>>>>> overhead during
>>>>>>>>>>>> > > > >>>>>> transmission — especially when many splits are
>>>>>>>>>>>> involved.
>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>> > > > >>>>>> 2. Risk of Split Loss
>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>> > > > >>>>>> Risk of split loss exists unless we have a
>>>>>>>>>>>> mechanism to make
>>>>>>>>>>>> > sure
>>>>>>>>>>>> > > > >>>>>> only can checkpoint after all the splits are
>>>>>>>>>>>> reassigned.
>>>>>>>>>>>> > > > >>>>>> There are scenarios where splits could be lost due
>>>>>>>>>>>> to
>>>>>>>>>>>> > inconsistent
>>>>>>>>>>>> > > > >>>>>> state handling during recovery:
>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>> > > > >>>>>> Scenario 1:
>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>> > > > >>>>>>    1. Upon restart, Reader A reports assigned
>>>>>>>>>>>> splits (1 and 2),
>>>>>>>>>>>> > and
>>>>>>>>>>>> > > > >>>>>>    Reader B reports (3 and 4).
>>>>>>>>>>>> > > > >>>>>>    2. The enumerator receives these reports but
>>>>>>>>>>>> only reassigns
>>>>>>>>>>>> > > > >>>>>>    splits 1 and 2 — not 3 and 4.
>>>>>>>>>>>> > > > >>>>>>    3. A checkpoint or savepoint is then triggered.
>>>>>>>>>>>> Only splits 1
>>>>>>>>>>>> > and
>>>>>>>>>>>> > > > >>>>>>    2 are recorded in the reader states; splits 3
>>>>>>>>>>>> and 4 are not
>>>>>>>>>>>> > > > persisted.
>>>>>>>>>>>> > > > >>>>>>    4. If the job is later restarted from this
>>>>>>>>>>>> checkpoint, splits
>>>>>>>>>>>> > 3
>>>>>>>>>>>> > > > >>>>>>    and 4 will be permanently lost.
>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>> > > > >>>>>> Scenario 2:
>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>> > > > >>>>>>    1. Reader A reports splits (1 and 2), and
>>>>>>>>>>>> Reader B reports (3
>>>>>>>>>>>> > and
>>>>>>>>>>>> > > > >>>>>>    4) upon restart.
>>>>>>>>>>>> > > > >>>>>>    2. Before the enumerator receives all reports
>>>>>>>>>>>> and performs
>>>>>>>>>>>> > > > >>>>>>    reassignment, a checkpoint is triggered.
>>>>>>>>>>>> > > > >>>>>>    3. Since no splits have been reassigned yet,
>>>>>>>>>>>> both readers
>>>>>>>>>>>> > have
>>>>>>>>>>>> > > > >>>>>>    empty states.
>>>>>>>>>>>> > > > >>>>>>    4. When restarting from this checkpoint, all
>>>>>>>>>>>> four splits are
>>>>>>>>>>>> > > > lost.
>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>> > > > >>>>>> Let me know if you have thoughts on how we might
>>>>>>>>>>>> mitigate these
>>>>>>>>>>>> > > > risks!
>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>> > > > >>>>>> Best
>>>>>>>>>>>> > > > >>>>>> Hongshun
>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>> > > > >>>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <
>>>>>>>>>>>> [email protected]>
>>>>>>>>>>>> > > > >>>>>> wrote:
>>>>>>>>>>>> > > > >>>>>>
>>>>>>>>>>>> > > > >>>>>>> Hi Hongshun,
>>>>>>>>>>>> > > > >>>>>>>
>>>>>>>>>>>> > > > >>>>>>> The steps sound reasonable to me in general. In
>>>>>>>>>>>> terms of the
>>>>>>>>>>>> > > > updated
>>>>>>>>>>>> > > > >>>>>>> FLIP wiki, it would be good to see if we can keep
>>>>>>>>>>>> the protocol
>>>>>>>>>>>> > > > simple. One
>>>>>>>>>>>> > > > >>>>>>> alternative way to achieve this behavior is
>>>>>>>>>>>> following:
>>>>>>>>>>>> > > > >>>>>>>
>>>>>>>>>>>> > > > >>>>>>> 1. Upon SourceOperator startup, the
>>>>>>>>>>>> SourceOperator sends
>>>>>>>>>>>> > > > >>>>>>> ReaderRegistrationEvent with the currently
>>>>>>>>>>>> assigned splits to
>>>>>>>>>>>> > the
>>>>>>>>>>>> > > > >>>>>>> enumerator. It does not add these splits to the
>>>>>>>>>>>> SourceReader.
>>>>>>>>>>>> > > > >>>>>>> 2. The enumerator will always use the
>>>>>>>>>>>> > > > >>>>>>> SourceEnumeratorContext.assignSplits() to assign
>>>>>>>>>>>> the splits.
>>>>>>>>>>>> > (not
>>>>>>>>>>>> > > > via the
>>>>>>>>>>>> > > > >>>>>>> response of the SourceRegistrationEvent, this
>>>>>>>>>>>> allows async
>>>>>>>>>>>> > split
>>>>>>>>>>>> > > > assignment
>>>>>>>>>>>> > > > >>>>>>> in case the enumerator wants to wait until all
>>>>>>>>>>>> the readers are
>>>>>>>>>>>> > > > registered)
>>>>>>>>>>>> > > > >>>>>>> 3. The SourceOperator will only call
>>>>>>>>>>>> SourceReader.addSplits()
>>>>>>>>>>>> > when
>>>>>>>>>>>> > > > >>>>>>> it receives the AddSplitEvent from the enumerator.
>>>>>>>>>>>> > > > >>>>>>>
>>>>>>>>>>>> > > > >>>>>>> This protocol has a few benefits:
>>>>>>>>>>>> > > > >>>>>>> 1. it basically allows arbitrary split
>>>>>>>>>>>> reassignment upon
>>>>>>>>>>>> > restart
>>>>>>>>>>>> > > > >>>>>>> 2. simplicity: there is only one way to assign
>>>>>>>>>>>> splits.
>>>>>>>>>>>> > > > >>>>>>>
>>>>>>>>>>>> > > > >>>>>>> So we only need one interface change:
>>>>>>>>>>>> > > > >>>>>>> - add the initially assigned splits to ReaderInfo
>>>>>>>>>>>> so the
>>>>>>>>>>>> > Enumerator
>>>>>>>>>>>> > > > >>>>>>> can access it.
>>>>>>>>>>>> > > > >>>>>>> and one behavior change:
>>>>>>>>>>>> > > > >>>>>>> - The SourceOperator should stop assigning splits
>>>>>>>>>>>> to the from
>>>>>>>>>>>> > state
>>>>>>>>>>>> > > > >>>>>>> restoration, but
>>>>>>>>>>>> > [message truncated...]
>>>>>>>>>>>> >
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>

Reply via email to