Hi Becket,

> I am curious what would the enumerator do differently for the splits
added via addSplitsBackOnRecovery() V.S. addSplitsBack()?

 In this FLIP, there are two distinct scenarios in which the enumerator
receives splits being added back:
1.  Job-level restore: The job is restored,  splits from reader’s state are
reported by ReaderRegistrationEvent.
2.  Reader-level restart: a reader is started but not the whole  job,
 splits assigned to it after the last successful checkpoint. This is what
addSplitsBack used to do.


In these two situations, the enumerator will choose different strategies.
1. Job-level restore: the splits should be redistributed across readers
according to the current partitioner strategy.
2. Reader-level restart: the splits should be reassigned directly back to
the same reader they were originally assigned to, preserving locality and
avoiding unnecessary redistribution

Therefore, the enumerator must clearly distinguish between these two
scenarios.I used to deprecate the former  addSplitsBack(List<SplitT>
splits, int subtaskId) but add a new addSplitsBack(List<SplitT>
splits, int subtaskId,
boolean reportedByReader).
Leonard suggest to use another method addSplitsBackOnRecovery but not
influenced  currently addSplitsBack.

Best
Hongshun



On 2025/09/08 17:20:31 Becket Qin wrote:
> Hi Leonard,
>
>
> > Could we introduce a new method like addSplitsBackOnRecovery  with
default
> > implementation. In this way, we can provide better backward
compatibility
> > and also makes it easier for developers to understand.
>
>
> I am curious what would the enumerator do differently for the splits added
> via addSplitsBackOnRecovery() V.S. addSplitsBack()?  Today,
addSplitsBack()
> is also only called upon recovery. So the new method seems confusing. One
> thing worth clarifying is if the Source implements
> SupportSplitReassignmentOnRecovery, upon recovery, should the splits
> reported by the readers also be added back to the SplitEnumerator via the
> addSplitsBack() call? Or should the SplitEnumerator explicitly query the
> registered reader information via the SplitEnumeratorContext to get the
> originally assigned splits when addReader() is invoked? I was assuming the
> latter in the beginning, so the behavior of addSplitsBack() remains
> unchanged, but I am not opposed in doing the former.
>
> Also, can you elaborate on the backwards compatibility issue you see if we
> do not have a separate addSplitsBackOnRecovery() method? Even without this
> new method, the behavior remains exactly the same unless the end users
> implement the mix-in interface of "SupportSplitReassignmentOnRecovery",
> right?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Mon, Sep 8, 2025 at 1:48 AM Hongshun Wang <[email protected]>
> wrote:
>
> > Hi devs,
> >
> > It has been quite some time since this FLIP[1] was first proposed. Thank
> > you for your valuable feedback—based on your suggestions, the FLIP has
> > undergone several rounds of revisions.
> >
> > Any more advice is welcome and appreciated. If there are no further
> > concerns, I plan to start the vote tomorrow.
> >
> > Best
> > Hongshun
> >
> > [1]
> >
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480
> >
> > On Mon, Sep 8, 2025 at 4:42 PM Hongshun Wang <[email protected]>
> > wrote:
> >
> > > Hi Leonard,
> > > Thanks for your advice.  It makes sense and I have modified it.
> > >
> > > Best,
> > > Hongshun
> > >
> > > On Mon, Sep 8, 2025 at 11:40 AM Leonard Xu <[email protected]> wrote:
> > >
> > >> Thanks Hongshun and Becket for the deep discussion.
> > >>
> > >> I only have one comment for one API design:
> > >>
> > >> > Deprecate the old addSplitsBack  method, use a addSplitsBack with
> > >> param isReportedByReader instead. Because, The enumerator can apply
> > >> different reassignment policies based on the context.
> > >>
> > >> Could we introduce a new method like *addSplitsBackOnRecovery*  with
> > default
> > >> implementation. In this way, we can provide better backward
> > >> compatibility and also makes it easier for developers to understand.
> > >>
> > >> Best,
> > >> Leonard
> > >>
> > >>
> > >>
> > >> 2025 9月 3 20:26,Hongshun Wang <[email protected]> 写道:
> > >>
> > >> Hi Becket,
> > >>
> > >> I think that's a great idea!  I have added the
> > >> SupportSplitReassignmentOnRecovery interface in this FLIP. If a
Source
> > >> implements this interface indicates that the source operator needs to
> > >> report splits to the enumerator and receive reassignment.[1]
> > >>
> > >> Best,
> > >> Hongshun
> > >>
> > >> [1]
> > >>
> >
https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
> > >>
> > >> On Thu, Aug 21, 2025 at 12:09 PM Becket Qin <[email protected]>
> > wrote:
> > >>
> > >>> Hi Hongshun,
> > >>>
> > >>> I think the convention for such optional features in Source is via
> > >>> mix-in interfaces. So instead of adding a method to the
SourceReader,
> > maybe
> > >>> we should introduce an interface SupportSplitReassingmentOnRecovery
> > with
> > >>> this method. If a Source implementation implements that interface,
> > then the
> > >>> SourceOperator will check the desired behavior and act accordingly.
> > >>>
> > >>> Thanks,
> > >>>
> > >>> Jiangjie (Becket) Qin
> > >>>
> > >>> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang <
[email protected]
> > >
> > >>> wrote:
> > >>>
> > >>>> Hi de vs,
> > >>>>
> > >>>> Would anyone like to discuss this FLIP? I'd appreciate your
feedback
> > >>>> and suggestions.
> > >>>>
> > >>>> Best,
> > >>>> Hongshun
> > >>>>
> > >>>>
> > >>>> 2025年8月13日 14:23,Hongshun Wang <[email protected]> 写道:
> > >>>>
> > >>>> Hi Becket,
> > >>>>
> > >>>> Thank you for your detailed feedback. The new contract makes good
> > sense
> > >>>> to me and effectively addresses the issues I encountered at the
> > beginning
> > >>>> of the design.
> > >>>>
> > >>>> That said, I recommend not reporting splits by default, primarily
for
> > >>>> compatibility and practical reasons:
> > >>>>
> > >>>> >  For these reasons, we do not expect the Split objects to be
huge,
> > >>>> and we are not trying to design for huge Split objects either as
they
> > will
> > >>>> have problems even today.
> > >>>>
> > >>>>    1.
> > >>>>
> > >>>>    Not all existing connector match this rule
> > >>>>    For example, in mysql cdc connector, a binlog split may contain
> > >>>>    hundreds (or even more) snapshot split completion records. This
> > state is
> > >>>>    large and is currently transmitted incrementally through
multiple
> > >>>>    BinlogSplitMetaEvent messages. Since the binlog reader operates
> > >>>>    with single parallelism, reporting the full split state on
recovery
> > >>>>    could be inefficient or even infeasible.
> > >>>>    For such sources, it would be better to provide a mechanism to
skip
> > >>>>    split reporting during restart until they redesign and reduce
the
> > >>>>    split size.
> > >>>>    2.
> > >>>>
> > >>>>    Not all enumerators maintain unassigned splits in state.
> > >>>>    Some SplitEnumerator(such as kafka connector) implementations do
> > >>>>    not track or persistently manage unassigned splits. Requiring
them
> > to
> > >>>>    handle re-registration would add unnecessary complexity. Even
> > though we
> > >>>>    maybe implements in kafka connector, currently, kafka connector
is
> > decouple
> > >>>>    with flink version, we also need to make sure the elder version
is
> > >>>>    compatible.
> > >>>>
> > >>>> ------------------------------
> > >>>>
> > >>>> To address these concerns, I propose introducing a new method:
boolean
> > >>>> SourceReader#shouldReassignSplitsOnRecovery() with a default
> > >>>> implementation returning false. This allows source readers to opt
in
> > >>>> to split reassignment only when necessary. Since the new contract
> > already
> > >>>> places the responsibility for split assignment on the enumerator,
not
> > >>>> reporting splits by default is a safe and clean default behavior.
> > >>>>
> > >>>>
> > >>>> ------------------------------
> > >>>>
> > >>>>
> > >>>> I’ve updated the implementation and the FIP accordingly[1]. It
quite a
> > >>>> big change. In particular, for the Kafka connector, we can now use
a
> > >>>> pluggable SplitPartitioner to support different split assignment
> > >>>> strategies (e.g., default, round-robin).
> > >>>>
> > >>>>
> > >>>> Could you please review it when you have a chance?
> > >>>>
> > >>>>
> > >>>> Best,
> > >>>>
> > >>>> Hongshun
> > >>>>
> > >>>>
> > >>>> [1]
> > >>>>
> >
https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
> > >>>>
> > >>>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <[email protected]>
> > wrote:
> > >>>>
> > >>>>> Hi Hongshun,
> > >>>>>
> > >>>>> I am not too concerned about the transmission cost. Because the
full
> > >>>>> split transmission has to happen in the initial assignment phase
> > already.
> > >>>>> And in the future, we probably want to also introduce some kind of
> > workload
> > >>>>> balance across source readers, e.g. based on the per-split
> > throughput or
> > >>>>> the per-source-reader workload in heterogeneous clusters. For
these
> > >>>>> reasons, we do not expect the Split objects to be huge, and we are
> > not
> > >>>>> trying to design for huge Split objects either as they will have
> > problems
> > >>>>> even today.
> > >>>>>
> > >>>>> Good point on the potential split loss, please see the reply
below:
> > >>>>>
> > >>>>> Scenario 2:
> > >>>>>
> > >>>>>
> > >>>>>> 1. Reader A reports splits (1 and 2), and Reader B reports (3
and 4)
> > >>>>>> upon restart.
> > >>>>>> 2. Before the enumerator receives all reports and performs
> > >>>>>> reassignment, a checkpoint is triggered.
> > >>>>>> 3. Since no splits have been reassigned yet, both readers have
empty
> > >>>>>> states.
> > >>>>>> 4. When restarting from this checkpoint, all four splits are
lost.
> > >>>>>
> > >>>>> The reader registration happens in the SourceOperator.open(),
which
> > >>>>> means the task is still in the initializing state, therefore the
> > checkpoint
> > >>>>> should not be triggered until the enumerator receives all the
split
> > reports.
> > >>>>>
> > >>>>> There is a nuance here. Today, the RPC call from the TM to the JM
is
> > >>>>> async. So it is possible that the SourceOpertor.open() has
returned,
> > but
> > >>>>> the enumerator has not received the split reports. However,
because
> > the
> > >>>>> task status update RPC call goes to the same channel as the split
> > reports
> > >>>>> call, so the task status RPC call will happen after the split
> > reports call
> > >>>>> on the JM side. Therefore, on the JM side, the SourceCoordinator
will
> > >>>>> always first receive the split reports, then receive the
checkpoint
> > request.
> > >>>>> This "happen before" relationship is kind of important to
guarantee
> > >>>>> the consistent state between enumerator and readers.
> > >>>>>
> > >>>>> Scenario 1:
> > >>>>>
> > >>>>>
> > >>>>>> 1. Upon restart, Reader A reports assigned splits (1 and 2), and
> > >>>>>> Reader B reports (3 and 4).
> > >>>>>> 2. The enumerator receives these reports but only reassigns
splits 1
> > >>>>>> and 2 — not 3 and 4.
> > >>>>>> 3. A checkpoint or savepoint is then triggered. Only splits 1
and 2
> > >>>>>> are recorded in the reader states; splits 3 and 4 are not
persisted.
> > >>>>>> 4. If the job is later restarted from this checkpoint, splits 3
and
> > 4
> > >>>>>> will be permanently lost.
> > >>>>>
> > >>>>> This scenario is possible. One solution is to let the enumerator
> > >>>>> implementation handle this. That means if the enumerator relies on
> > the
> > >>>>> initial split reports from the source readers, it should maintain
> > these
> > >>>>> reports by itself. In the above example, the enumerator will need
to
> > >>>>> remember that 3 and 4 are not assigned and put it into its own
state.
> > >>>>> The current contract is that anything assigned to the
SourceReaders
> > >>>>> are completely owned by the SourceReaders. Enumerators can
remember
> > the
> > >>>>> assignments but cannot change them, even when the source reader
> > recovers /
> > >>>>> restarts.
> > >>>>> With this FLIP, the contract becomes that the source readers will
> > >>>>> return the ownership of the splits to the enumerator. So the
> > enumerator is
> > >>>>> responsible for maintaining these splits, until they are assigned
to
> > a
> > >>>>> source reader again.
> > >>>>>
> > >>>>> There are other cases where there may be conflict information
between
> > >>>>> reader and enumerator. For example, consider the following
sequence:
> > >>>>> 1. reader A reports splits (1 and 2) up on restart.
> > >>>>> 2. enumerator receives the report and assigns both 1 and 2 to
reader
> > B.
> > >>>>> 3. reader A failed before checkpointing. And this is a partial
> > >>>>> failure, so only reader A restarts.
> > >>>>> 4. When reader A recovers, it will again report splits (1 and 2)
to
> > >>>>> the enumerator.
> > >>>>> 5. The enumerator should ignore this report because it has
> > >>>>> assigned splits (1 and 2) to reader B.
> > >>>>>
> > >>>>> So with the new contract, the enumerator should be the source of
> > truth
> > >>>>> for split ownership.
> > >>>>>
> > >>>>> Thanks,
> > >>>>>
> > >>>>> Jiangjie (Becket) Qin
> > >>>>>
> > >>>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang <
> > [email protected]>
> > >>>>> wrote:
> > >>>>>
> > >>>>>> Hi Becket,
> > >>>>>>
> > >>>>>>
> > >>>>>> I did consider this approach at the beginning (and it was also
> > >>>>>> mentioned in this FLIP), since it would allow more flexibility in
> > >>>>>> reassigning all splits. However, there are a few potential
issues.
> > >>>>>>
> > >>>>>> 1. High Transmission Cost
> > >>>>>> If we pass the full split objects (rather than just split IDs),
the
> > >>>>>> data size could be significant, leading to high overhead during
> > >>>>>> transmission — especially when many splits are involved.
> > >>>>>>
> > >>>>>> 2. Risk of Split Loss
> > >>>>>>
> > >>>>>> Risk of split loss exists unless we have a mechanism to make sure
> > >>>>>> only can checkpoint after all the splits are reassigned.
> > >>>>>> There are scenarios where splits could be lost due to
inconsistent
> > >>>>>> state handling during recovery:
> > >>>>>>
> > >>>>>>
> > >>>>>> Scenario 1:
> > >>>>>>
> > >>>>>>
> > >>>>>>    1. Upon restart, Reader A reports assigned splits (1 and 2),
and
> > >>>>>>    Reader B reports (3 and 4).
> > >>>>>>    2. The enumerator receives these reports but only reassigns
> > >>>>>>    splits 1 and 2 — not 3 and 4.
> > >>>>>>    3. A checkpoint or savepoint is then triggered. Only splits 1
and
> > >>>>>>    2 are recorded in the reader states; splits 3 and 4 are not
> > persisted.
> > >>>>>>    4. If the job is later restarted from this checkpoint, splits
3
> > >>>>>>    and 4 will be permanently lost.
> > >>>>>>
> > >>>>>>
> > >>>>>> Scenario 2:
> > >>>>>>
> > >>>>>>    1. Reader A reports splits (1 and 2), and Reader B reports (3
and
> > >>>>>>    4) upon restart.
> > >>>>>>    2. Before the enumerator receives all reports and performs
> > >>>>>>    reassignment, a checkpoint is triggered.
> > >>>>>>    3. Since no splits have been reassigned yet, both readers have
> > >>>>>>    empty states.
> > >>>>>>    4. When restarting from this checkpoint, all four splits are
> > lost.
> > >>>>>>
> > >>>>>>
> > >>>>>> Let me know if you have thoughts on how we might mitigate these
> > risks!
> > >>>>>>
> > >>>>>> Best
> > >>>>>> Hongshun
> > >>>>>>
> > >>>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <[email protected]>
> > >>>>>> wrote:
> > >>>>>>
> > >>>>>>> Hi Hongshun,
> > >>>>>>>
> > >>>>>>> The steps sound reasonable to me in general. In terms of the
> > updated
> > >>>>>>> FLIP wiki, it would be good to see if we can keep the protocol
> > simple. One
> > >>>>>>> alternative way to achieve this behavior is following:
> > >>>>>>>
> > >>>>>>> 1. Upon SourceOperator startup, the SourceOperator sends
> > >>>>>>> ReaderRegistrationEvent with the currently assigned splits to
the
> > >>>>>>> enumerator. It does not add these splits to the SourceReader.
> > >>>>>>> 2. The enumerator will always use the
> > >>>>>>> SourceEnumeratorContext.assignSplits() to assign the splits.
(not
> > via the
> > >>>>>>> response of the SourceRegistrationEvent, this allows async split
> > assignment
> > >>>>>>> in case the enumerator wants to wait until all the readers are
> > registered)
> > >>>>>>> 3. The SourceOperator will only call SourceReader.addSplits()
when
> > >>>>>>> it receives the AddSplitEvent from the enumerator.
> > >>>>>>>
> > >>>>>>> This protocol has a few benefits:
> > >>>>>>> 1. it basically allows arbitrary split reassignment upon restart
> > >>>>>>> 2. simplicity: there is only one way to assign splits.
> > >>>>>>>
> > >>>>>>> So we only need one interface change:
> > >>>>>>> - add the initially assigned splits to ReaderInfo so the
Enumerator
> > >>>>>>> can access it.
> > >>>>>>> and one behavior change:
> > >>>>>>> - The SourceOperator should stop assigning splits to the from
state
> > >>>>>>> restoration, but
[message truncated...]

Reply via email to