Hi Becket,

I Got it. You’re suggesting we should not handle this in the source
framework but instead let the split enumerator manage these three scenarios.


Let me explain why I originally favored handling it in the framework: I'm
concerned that connector developers might overlook certain edge cases
(after all, we even payed extensive discussions to fully clarify the logic)


However, your point keeps the framework cleaner and more flexible. Thus, I
will take it.


Perhaps, in this FLIP, we should focus on providing comprehensive guidance
for connector developers: explain how to implement a split enumerator,
including the underlying challenges and their solutions.



Additionally, we can use the Kafka connector as a reference
implementation to demonstrate the practical steps. This way, developers who
want to implement similar connectors can directly reference this example.



Best,

Hongshun




On Fri, Sep 26, 2025 at 1:27 PM Becket Qin <[email protected]> wrote:

> It would be good to not expose runtime details to the source
> implementation if possible.
>
> Today, the split enumerator implementations are expected to track the
> split assignment.
>
> Assuming the split enumerator implementation keeps a split assignment map,
> that means the enumerator should already know whether a split is assigned
> or unassigned. So it can handle the three scenarios you mentioned.
>
> The split is reported by a reader during a global restoration.
>>
> The split enumerator should have just been restored / created. If the
> enumerator expects a full reassignment of splits up on global recovery,
> there should be no assigned splits to that reader in the split assignment
> mapping.
>
> The split is reported by a reader during a partial failure recovery.
>>
> In this case, when SplitEnumerator.addReader() is invoked, the split
> assignment map in the enumerator implementation should already have some
> split assignments for the reader. Therefore it is a partial failover. If
> the source supports split reassignment on recovery, the enumerator can
> assign splits that are different from the reported assignment of that
> reader in the SplitEnumeratorContext, or it can also assign the same
> splits. In any case, the enumerator knows that this is a partial recovery
> because the assignment map is non-empty.
>
> The split is not reported by a reader, but is assigned after the last
>> successful checkpoint and was never acknowledged.
>
> This is actually one of the step in the partial failure recover.
> SplitEnumerator.addSplitsBack() will be called first before
> SplitReader.addReader() is called for the recovered reader. When the
> SplitEnumerator.addSplitsBack() is invoked, it is for sure a partial
> recovery. And the enumerator should remove these splits from the split
> assignment map as if they were never assigned.
>
> I think this should work, right?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Thu, Sep 25, 2025 at 8:34 PM Hongshun Wang <[email protected]>
> wrote:
>
>> Hi Becket and Leonard,
>>
>> Thanks for your advice.
>>
>> > put all the reader information in the SplitEnumerator context
>>
>> I have a concern: the current registeredReaders in*
>> SourceCoordinatorContext will be removed after subtaskResetor execution on
>> failure*.However, this approach has merit.
>>
>>
>> One more situation I found my previous design does not cover:
>>
>>    1. Initial state: Reader A reports splits (1, 2).
>>    2. Enumerator action: Assigns split 1 to Reader A, and split 2 to
>>    Reader B.
>>    3. Failure scenario: Reader A fails before checkpointing. Since this
>>    is a partial failure, only Reader A restarts.
>>    4. Recovery issue: Upon recovery, Reader A re-reports split (1).
>>
>> In my previous design, the enumerator will ignore Reader A's
>> re-registration which will cause data loss.
>>
>> Thus, when the enumerator receives a split, the split may originate
>> from three scenarios:
>>
>>    1. The split is reported by a reader during a global restoration.
>>    2. The split is reported by a reader during a partial failure
>>    recovery.
>>    3. The split is not reported by a reader, but is assigned after the
>>    last successful checkpoint and was never acknowledged.
>>
>> In the first scenario (global restore), the split should
>> be re-distributed. For the latter two scenarios (partial failover and
>> post-checkpoint assignment), we need to reassign the split to
>> its originally assigned subtask.
>>
>>
>> By implementing a method in the SplitEnumerator context to track each
>> assigned split's status, the system can correctly identify and resolve
>> split ownership in all three scenarios.*What about adding a
>> `SplitRecoveryType splitRecoveryType(Split split)` in
>> SplitEnumeratorContext.* SplitRecoveryTypeis a enum including
>> `UNASSIGNED`、`GLOBAL_RESTORE`、`PARTIAL_FAILOVER` and
>> `POST_CHECKPOINT_ASSIGNMENT`.
>>
>>
>> What do you think? Are there any details or scenarios I haven't
>> considered? Looking forward to your advice.
>>
>>
>> Best,
>>
>> Hongshun
>>
>> On Thu, Sep 11, 2025 at 12:41 AM Becket Qin <[email protected]> wrote:
>>
>>> Thanks for the explanation, Hongshun.
>>>
>>> Current pattern of handling new reader registration following:
>>> 1. put all the reader information in the SplitEnumerator context
>>> 2. notify the enumerator about the new reader registration.
>>> 3. Let the split enumerator get whatever information it wants from the
>>> context and do its job.
>>> This pattern decouples the information passing and the reader
>>> registration
>>> notification. This makes the API extensible - we can add more information
>>> (e.g. reported assigned splits in our case) about the reader to the
>>> context
>>> without introducing new methods.
>>>
>>> Introducing a new method of addSplitBackOnRecovery() is redundant to the
>>> above pattern. Do we really need it?
>>>
>>> Thanks,
>>>
>>> Jiangjie (Becket) Qin
>>>
>>> On Mon, Sep 8, 2025 at 8:18 PM Hongshun Wang <[email protected]>
>>> wrote:
>>>
>>> > Hi Becket,
>>> >
>>> > > I am curious what would the enumerator do differently for the splits
>>> > added via addSplitsBackOnRecovery() V.S. addSplitsBack()?
>>> >
>>> >  In this FLIP, there are two distinct scenarios in which the enumerator
>>> > receives splits being added back:
>>> > 1.  Job-level restore: The job is restored,  splits from reader’s
>>> state are
>>> > reported by ReaderRegistrationEvent.
>>> > 2.  Reader-level restart: a reader is started but not the whole  job,
>>> >  splits assigned to it after the last successful checkpoint. This is
>>> what
>>> > addSplitsBack used to do.
>>> >
>>> >
>>> > In these two situations, the enumerator will choose different
>>> strategies.
>>> > 1. Job-level restore: the splits should be redistributed across readers
>>> > according to the current partitioner strategy.
>>> > 2. Reader-level restart: the splits should be reassigned directly back
>>> to
>>> > the same reader they were originally assigned to, preserving locality
>>> and
>>> > avoiding unnecessary redistribution
>>> >
>>> > Therefore, the enumerator must clearly distinguish between these two
>>> > scenarios.I used to deprecate the former  addSplitsBack(List<SplitT>
>>> > splits, int subtaskId) but add a new addSplitsBack(List<SplitT>
>>> > splits, int subtaskId,
>>> > boolean reportedByReader).
>>> > Leonard suggest to use another method addSplitsBackOnRecovery but not
>>> > influenced  currently addSplitsBack.
>>> >
>>> > Best
>>> > Hongshun
>>> >
>>> >
>>> >
>>> > On 2025/09/08 17:20:31 Becket Qin wrote:
>>> > > Hi Leonard,
>>> > >
>>> > >
>>> > > > Could we introduce a new method like addSplitsBackOnRecovery  with
>>> > default
>>> > > > implementation. In this way, we can provide better backward
>>> > compatibility
>>> > > > and also makes it easier for developers to understand.
>>> > >
>>> > >
>>> > > I am curious what would the enumerator do differently for the splits
>>> > added
>>> > > via addSplitsBackOnRecovery() V.S. addSplitsBack()?  Today,
>>> > addSplitsBack()
>>> > > is also only called upon recovery. So the new method seems
>>> confusing. One
>>> > > thing worth clarifying is if the Source implements
>>> > > SupportSplitReassignmentOnRecovery, upon recovery, should the splits
>>> > > reported by the readers also be added back to the SplitEnumerator
>>> via the
>>> > > addSplitsBack() call? Or should the SplitEnumerator explicitly query
>>> the
>>> > > registered reader information via the SplitEnumeratorContext to get
>>> the
>>> > > originally assigned splits when addReader() is invoked? I was
>>> assuming
>>> > the
>>> > > latter in the beginning, so the behavior of addSplitsBack() remains
>>> > > unchanged, but I am not opposed in doing the former.
>>> > >
>>> > > Also, can you elaborate on the backwards compatibility issue you see
>>> if
>>> > we
>>> > > do not have a separate addSplitsBackOnRecovery() method? Even without
>>> > this
>>> > > new method, the behavior remains exactly the same unless the end
>>> users
>>> > > implement the mix-in interface of
>>> "SupportSplitReassignmentOnRecovery",
>>> > > right?
>>> > >
>>> > > Thanks,
>>> > >
>>> > > Jiangjie (Becket) Qin
>>> > >
>>> > > On Mon, Sep 8, 2025 at 1:48 AM Hongshun Wang <[email protected]>
>>> > > wrote:
>>> > >
>>> > > > Hi devs,
>>> > > >
>>> > > > It has been quite some time since this FLIP[1] was first proposed.
>>> > Thank
>>> > > > you for your valuable feedback—based on your suggestions, the FLIP
>>> has
>>> > > > undergone several rounds of revisions.
>>> > > >
>>> > > > Any more advice is welcome and appreciated. If there are no further
>>> > > > concerns, I plan to start the vote tomorrow.
>>> > > >
>>> > > > Best
>>> > > > Hongshun
>>> > > >
>>> > > > [1]
>>> > > >
>>> >
>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480
>>> > > >
>>> > > > On Mon, Sep 8, 2025 at 4:42 PM Hongshun Wang <[email protected]>
>>> > > > wrote:
>>> > > >
>>> > > > > Hi Leonard,
>>> > > > > Thanks for your advice.  It makes sense and I have modified it.
>>> > > > >
>>> > > > > Best,
>>> > > > > Hongshun
>>> > > > >
>>> > > > > On Mon, Sep 8, 2025 at 11:40 AM Leonard Xu <[email protected]>
>>> wrote:
>>> > > > >
>>> > > > >> Thanks Hongshun and Becket for the deep discussion.
>>> > > > >>
>>> > > > >> I only have one comment for one API design:
>>> > > > >>
>>> > > > >> > Deprecate the old addSplitsBack  method, use a addSplitsBack
>>> with
>>> > > > >> param isReportedByReader instead. Because, The enumerator can
>>> apply
>>> > > > >> different reassignment policies based on the context.
>>> > > > >>
>>> > > > >> Could we introduce a new method like *addSplitsBackOnRecovery*
>>> with
>>> > > > default
>>> > > > >> implementation. In this way, we can provide better backward
>>> > > > >> compatibility and also makes it easier for developers to
>>> understand.
>>> > > > >>
>>> > > > >> Best,
>>> > > > >> Leonard
>>> > > > >>
>>> > > > >>
>>> > > > >>
>>> > > > >> 2025 9月 3 20:26,Hongshun Wang <[email protected]> 写道:
>>> > > > >>
>>> > > > >> Hi Becket,
>>> > > > >>
>>> > > > >> I think that's a great idea!  I have added the
>>> > > > >> SupportSplitReassignmentOnRecovery interface in this FLIP. If a
>>> > Source
>>> > > > >> implements this interface indicates that the source operator
>>> needs
>>> > to
>>> > > > >> report splits to the enumerator and receive reassignment.[1]
>>> > > > >>
>>> > > > >> Best,
>>> > > > >> Hongshun
>>> > > > >>
>>> > > > >> [1]
>>> > > > >>
>>> > > >
>>> >
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>> > > > >>
>>> > > > >> On Thu, Aug 21, 2025 at 12:09 PM Becket Qin <[email protected]>
>>> > > > wrote:
>>> > > > >>
>>> > > > >>> Hi Hongshun,
>>> > > > >>>
>>> > > > >>> I think the convention for such optional features in Source is
>>> via
>>> > > > >>> mix-in interfaces. So instead of adding a method to the
>>> > SourceReader,
>>> > > > maybe
>>> > > > >>> we should introduce an interface
>>> SupportSplitReassingmentOnRecovery
>>> > > > with
>>> > > > >>> this method. If a Source implementation implements that
>>> interface,
>>> > > > then the
>>> > > > >>> SourceOperator will check the desired behavior and act
>>> accordingly.
>>> > > > >>>
>>> > > > >>> Thanks,
>>> > > > >>>
>>> > > > >>> Jiangjie (Becket) Qin
>>> > > > >>>
>>> > > > >>> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang <
>>> > [email protected]
>>> > > > >
>>> > > > >>> wrote:
>>> > > > >>>
>>> > > > >>>> Hi de vs,
>>> > > > >>>>
>>> > > > >>>> Would anyone like to discuss this FLIP? I'd appreciate your
>>> > feedback
>>> > > > >>>> and suggestions.
>>> > > > >>>>
>>> > > > >>>> Best,
>>> > > > >>>> Hongshun
>>> > > > >>>>
>>> > > > >>>>
>>> > > > >>>> 2025年8月13日 14:23,Hongshun Wang <[email protected]> 写道:
>>> > > > >>>>
>>> > > > >>>> Hi Becket,
>>> > > > >>>>
>>> > > > >>>> Thank you for your detailed feedback. The new contract makes
>>> good
>>> > > > sense
>>> > > > >>>> to me and effectively addresses the issues I encountered at
>>> the
>>> > > > beginning
>>> > > > >>>> of the design.
>>> > > > >>>>
>>> > > > >>>> That said, I recommend not reporting splits by default,
>>> primarily
>>> > for
>>> > > > >>>> compatibility and practical reasons:
>>> > > > >>>>
>>> > > > >>>> >  For these reasons, we do not expect the Split objects to be
>>> > huge,
>>> > > > >>>> and we are not trying to design for huge Split objects either
>>> as
>>> > they
>>> > > > will
>>> > > > >>>> have problems even today.
>>> > > > >>>>
>>> > > > >>>>    1.
>>> > > > >>>>
>>> > > > >>>>    Not all existing connector match this rule
>>> > > > >>>>    For example, in mysql cdc connector, a binlog split may
>>> contain
>>> > > > >>>>    hundreds (or even more) snapshot split completion records.
>>> This
>>> > > > state is
>>> > > > >>>>    large and is currently transmitted incrementally through
>>> > multiple
>>> > > > >>>>    BinlogSplitMetaEvent messages. Since the binlog reader
>>> operates
>>> > > > >>>>    with single parallelism, reporting the full split state on
>>> > recovery
>>> > > > >>>>    could be inefficient or even infeasible.
>>> > > > >>>>    For such sources, it would be better to provide a
>>> mechanism to
>>> > skip
>>> > > > >>>>    split reporting during restart until they redesign and
>>> reduce
>>> > the
>>> > > > >>>>    split size.
>>> > > > >>>>    2.
>>> > > > >>>>
>>> > > > >>>>    Not all enumerators maintain unassigned splits in state.
>>> > > > >>>>    Some SplitEnumerator(such as kafka connector)
>>> implementations
>>> > do
>>> > > > >>>>    not track or persistently manage unassigned splits.
>>> Requiring
>>> > them
>>> > > > to
>>> > > > >>>>    handle re-registration would add unnecessary complexity.
>>> Even
>>> > > > though we
>>> > > > >>>>    maybe implements in kafka connector, currently, kafka
>>> connector
>>> > is
>>> > > > decouple
>>> > > > >>>>    with flink version, we also need to make sure the elder
>>> version
>>> > is
>>> > > > >>>>    compatible.
>>> > > > >>>>
>>> > > > >>>> ------------------------------
>>> > > > >>>>
>>> > > > >>>> To address these concerns, I propose introducing a new method:
>>> > boolean
>>> > > > >>>> SourceReader#shouldReassignSplitsOnRecovery() with a default
>>> > > > >>>> implementation returning false. This allows source readers to
>>> opt
>>> > in
>>> > > > >>>> to split reassignment only when necessary. Since the new
>>> contract
>>> > > > already
>>> > > > >>>> places the responsibility for split assignment on the
>>> enumerator,
>>> > not
>>> > > > >>>> reporting splits by default is a safe and clean default
>>> behavior.
>>> > > > >>>>
>>> > > > >>>>
>>> > > > >>>> ------------------------------
>>> > > > >>>>
>>> > > > >>>>
>>> > > > >>>> I’ve updated the implementation and the FIP accordingly[1]. It
>>> > quite a
>>> > > > >>>> big change. In particular, for the Kafka connector, we can
>>> now use
>>> > a
>>> > > > >>>> pluggable SplitPartitioner to support different split
>>> assignment
>>> > > > >>>> strategies (e.g., default, round-robin).
>>> > > > >>>>
>>> > > > >>>>
>>> > > > >>>> Could you please review it when you have a chance?
>>> > > > >>>>
>>> > > > >>>>
>>> > > > >>>> Best,
>>> > > > >>>>
>>> > > > >>>> Hongshun
>>> > > > >>>>
>>> > > > >>>>
>>> > > > >>>> [1]
>>> > > > >>>>
>>> > > >
>>> >
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>> > > > >>>>
>>> > > > >>>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <[email protected]>
>>> > > > wrote:
>>> > > > >>>>
>>> > > > >>>>> Hi Hongshun,
>>> > > > >>>>>
>>> > > > >>>>> I am not too concerned about the transmission cost. Because
>>> the
>>> > full
>>> > > > >>>>> split transmission has to happen in the initial assignment
>>> phase
>>> > > > already.
>>> > > > >>>>> And in the future, we probably want to also introduce some
>>> kind
>>> > of
>>> > > > workload
>>> > > > >>>>> balance across source readers, e.g. based on the per-split
>>> > > > throughput or
>>> > > > >>>>> the per-source-reader workload in heterogeneous clusters. For
>>> > these
>>> > > > >>>>> reasons, we do not expect the Split objects to be huge, and
>>> we
>>> > are
>>> > > > not
>>> > > > >>>>> trying to design for huge Split objects either as they will
>>> have
>>> > > > problems
>>> > > > >>>>> even today.
>>> > > > >>>>>
>>> > > > >>>>> Good point on the potential split loss, please see the reply
>>> > below:
>>> > > > >>>>>
>>> > > > >>>>> Scenario 2:
>>> > > > >>>>>
>>> > > > >>>>>
>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and Reader B reports
>>> (3
>>> > and 4)
>>> > > > >>>>>> upon restart.
>>> > > > >>>>>> 2. Before the enumerator receives all reports and performs
>>> > > > >>>>>> reassignment, a checkpoint is triggered.
>>> > > > >>>>>> 3. Since no splits have been reassigned yet, both readers
>>> have
>>> > empty
>>> > > > >>>>>> states.
>>> > > > >>>>>> 4. When restarting from this checkpoint, all four splits are
>>> > lost.
>>> > > > >>>>>
>>> > > > >>>>> The reader registration happens in the SourceOperator.open(),
>>> > which
>>> > > > >>>>> means the task is still in the initializing state, therefore
>>> the
>>> > > > checkpoint
>>> > > > >>>>> should not be triggered until the enumerator receives all the
>>> > split
>>> > > > reports.
>>> > > > >>>>>
>>> > > > >>>>> There is a nuance here. Today, the RPC call from the TM to
>>> the JM
>>> > is
>>> > > > >>>>> async. So it is possible that the SourceOpertor.open() has
>>> > returned,
>>> > > > but
>>> > > > >>>>> the enumerator has not received the split reports. However,
>>> > because
>>> > > > the
>>> > > > >>>>> task status update RPC call goes to the same channel as the
>>> split
>>> > > > reports
>>> > > > >>>>> call, so the task status RPC call will happen after the split
>>> > > > reports call
>>> > > > >>>>> on the JM side. Therefore, on the JM side, the
>>> SourceCoordinator
>>> > will
>>> > > > >>>>> always first receive the split reports, then receive the
>>> > checkpoint
>>> > > > request.
>>> > > > >>>>> This "happen before" relationship is kind of important to
>>> > guarantee
>>> > > > >>>>> the consistent state between enumerator and readers.
>>> > > > >>>>>
>>> > > > >>>>> Scenario 1:
>>> > > > >>>>>
>>> > > > >>>>>
>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned splits (1 and
>>> 2), and
>>> > > > >>>>>> Reader B reports (3 and 4).
>>> > > > >>>>>> 2. The enumerator receives these reports but only reassigns
>>> > splits 1
>>> > > > >>>>>> and 2 — not 3 and 4.
>>> > > > >>>>>> 3. A checkpoint or savepoint is then triggered. Only splits
>>> 1
>>> > and 2
>>> > > > >>>>>> are recorded in the reader states; splits 3 and 4 are not
>>> > persisted.
>>> > > > >>>>>> 4. If the job is later restarted from this checkpoint,
>>> splits 3
>>> > and
>>> > > > 4
>>> > > > >>>>>> will be permanently lost.
>>> > > > >>>>>
>>> > > > >>>>> This scenario is possible. One solution is to let the
>>> enumerator
>>> > > > >>>>> implementation handle this. That means if the enumerator
>>> relies
>>> > on
>>> > > > the
>>> > > > >>>>> initial split reports from the source readers, it should
>>> maintain
>>> > > > these
>>> > > > >>>>> reports by itself. In the above example, the enumerator will
>>> need
>>> > to
>>> > > > >>>>> remember that 3 and 4 are not assigned and put it into its
>>> own
>>> > state.
>>> > > > >>>>> The current contract is that anything assigned to the
>>> > SourceReaders
>>> > > > >>>>> are completely owned by the SourceReaders. Enumerators can
>>> > remember
>>> > > > the
>>> > > > >>>>> assignments but cannot change them, even when the source
>>> reader
>>> > > > recovers /
>>> > > > >>>>> restarts.
>>> > > > >>>>> With this FLIP, the contract becomes that the source readers
>>> will
>>> > > > >>>>> return the ownership of the splits to the enumerator. So the
>>> > > > enumerator is
>>> > > > >>>>> responsible for maintaining these splits, until they are
>>> assigned
>>> > to
>>> > > > a
>>> > > > >>>>> source reader again.
>>> > > > >>>>>
>>> > > > >>>>> There are other cases where there may be conflict information
>>> > between
>>> > > > >>>>> reader and enumerator. For example, consider the following
>>> > sequence:
>>> > > > >>>>> 1. reader A reports splits (1 and 2) up on restart.
>>> > > > >>>>> 2. enumerator receives the report and assigns both 1 and 2 to
>>> > reader
>>> > > > B.
>>> > > > >>>>> 3. reader A failed before checkpointing. And this is a
>>> partial
>>> > > > >>>>> failure, so only reader A restarts.
>>> > > > >>>>> 4. When reader A recovers, it will again report splits (1
>>> and 2)
>>> > to
>>> > > > >>>>> the enumerator.
>>> > > > >>>>> 5. The enumerator should ignore this report because it has
>>> > > > >>>>> assigned splits (1 and 2) to reader B.
>>> > > > >>>>>
>>> > > > >>>>> So with the new contract, the enumerator should be the
>>> source of
>>> > > > truth
>>> > > > >>>>> for split ownership.
>>> > > > >>>>>
>>> > > > >>>>> Thanks,
>>> > > > >>>>>
>>> > > > >>>>> Jiangjie (Becket) Qin
>>> > > > >>>>>
>>> > > > >>>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang <
>>> > > > [email protected]>
>>> > > > >>>>> wrote:
>>> > > > >>>>>
>>> > > > >>>>>> Hi Becket,
>>> > > > >>>>>>
>>> > > > >>>>>>
>>> > > > >>>>>> I did consider this approach at the beginning (and it was
>>> also
>>> > > > >>>>>> mentioned in this FLIP), since it would allow more
>>> flexibility
>>> > in
>>> > > > >>>>>> reassigning all splits. However, there are a few potential
>>> > issues.
>>> > > > >>>>>>
>>> > > > >>>>>> 1. High Transmission Cost
>>> > > > >>>>>> If we pass the full split objects (rather than just split
>>> IDs),
>>> > the
>>> > > > >>>>>> data size could be significant, leading to high overhead
>>> during
>>> > > > >>>>>> transmission — especially when many splits are involved.
>>> > > > >>>>>>
>>> > > > >>>>>> 2. Risk of Split Loss
>>> > > > >>>>>>
>>> > > > >>>>>> Risk of split loss exists unless we have a mechanism to make
>>> > sure
>>> > > > >>>>>> only can checkpoint after all the splits are reassigned.
>>> > > > >>>>>> There are scenarios where splits could be lost due to
>>> > inconsistent
>>> > > > >>>>>> state handling during recovery:
>>> > > > >>>>>>
>>> > > > >>>>>>
>>> > > > >>>>>> Scenario 1:
>>> > > > >>>>>>
>>> > > > >>>>>>
>>> > > > >>>>>>    1. Upon restart, Reader A reports assigned splits (1 and
>>> 2),
>>> > and
>>> > > > >>>>>>    Reader B reports (3 and 4).
>>> > > > >>>>>>    2. The enumerator receives these reports but only
>>> reassigns
>>> > > > >>>>>>    splits 1 and 2 — not 3 and 4.
>>> > > > >>>>>>    3. A checkpoint or savepoint is then triggered. Only
>>> splits 1
>>> > and
>>> > > > >>>>>>    2 are recorded in the reader states; splits 3 and 4 are
>>> not
>>> > > > persisted.
>>> > > > >>>>>>    4. If the job is later restarted from this checkpoint,
>>> splits
>>> > 3
>>> > > > >>>>>>    and 4 will be permanently lost.
>>> > > > >>>>>>
>>> > > > >>>>>>
>>> > > > >>>>>> Scenario 2:
>>> > > > >>>>>>
>>> > > > >>>>>>    1. Reader A reports splits (1 and 2), and Reader B
>>> reports (3
>>> > and
>>> > > > >>>>>>    4) upon restart.
>>> > > > >>>>>>    2. Before the enumerator receives all reports and
>>> performs
>>> > > > >>>>>>    reassignment, a checkpoint is triggered.
>>> > > > >>>>>>    3. Since no splits have been reassigned yet, both readers
>>> > have
>>> > > > >>>>>>    empty states.
>>> > > > >>>>>>    4. When restarting from this checkpoint, all four splits
>>> are
>>> > > > lost.
>>> > > > >>>>>>
>>> > > > >>>>>>
>>> > > > >>>>>> Let me know if you have thoughts on how we might mitigate
>>> these
>>> > > > risks!
>>> > > > >>>>>>
>>> > > > >>>>>> Best
>>> > > > >>>>>> Hongshun
>>> > > > >>>>>>
>>> > > > >>>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <[email protected]>
>>> > > > >>>>>> wrote:
>>> > > > >>>>>>
>>> > > > >>>>>>> Hi Hongshun,
>>> > > > >>>>>>>
>>> > > > >>>>>>> The steps sound reasonable to me in general. In terms of
>>> the
>>> > > > updated
>>> > > > >>>>>>> FLIP wiki, it would be good to see if we can keep the
>>> protocol
>>> > > > simple. One
>>> > > > >>>>>>> alternative way to achieve this behavior is following:
>>> > > > >>>>>>>
>>> > > > >>>>>>> 1. Upon SourceOperator startup, the SourceOperator sends
>>> > > > >>>>>>> ReaderRegistrationEvent with the currently assigned splits
>>> to
>>> > the
>>> > > > >>>>>>> enumerator. It does not add these splits to the
>>> SourceReader.
>>> > > > >>>>>>> 2. The enumerator will always use the
>>> > > > >>>>>>> SourceEnumeratorContext.assignSplits() to assign the
>>> splits.
>>> > (not
>>> > > > via the
>>> > > > >>>>>>> response of the SourceRegistrationEvent, this allows async
>>> > split
>>> > > > assignment
>>> > > > >>>>>>> in case the enumerator wants to wait until all the readers
>>> are
>>> > > > registered)
>>> > > > >>>>>>> 3. The SourceOperator will only call
>>> SourceReader.addSplits()
>>> > when
>>> > > > >>>>>>> it receives the AddSplitEvent from the enumerator.
>>> > > > >>>>>>>
>>> > > > >>>>>>> This protocol has a few benefits:
>>> > > > >>>>>>> 1. it basically allows arbitrary split reassignment upon
>>> > restart
>>> > > > >>>>>>> 2. simplicity: there is only one way to assign splits.
>>> > > > >>>>>>>
>>> > > > >>>>>>> So we only need one interface change:
>>> > > > >>>>>>> - add the initially assigned splits to ReaderInfo so the
>>> > Enumerator
>>> > > > >>>>>>> can access it.
>>> > > > >>>>>>> and one behavior change:
>>> > > > >>>>>>> - The SourceOperator should stop assigning splits to the
>>> from
>>> > state
>>> > > > >>>>>>> restoration, but
>>> > [message truncated...]
>>> >
>>>
>>

Reply via email to