Hi Becket and Leonard,

Thanks for your advice.

> put all the reader information in the SplitEnumerator context

I have a concern: the current registeredReaders in*
SourceCoordinatorContext will be removed after subtaskResetor execution on
failure*.However, this approach has merit.


One more situation I found my previous design does not cover:

   1. Initial state: Reader A reports splits (1, 2).
   2. Enumerator action: Assigns split 1 to Reader A, and split 2 to Reader
   B.
   3. Failure scenario: Reader A fails before checkpointing. Since this is
   a partial failure, only Reader A restarts.
   4. Recovery issue: Upon recovery, Reader A re-reports split (1).

In my previous design, the enumerator will ignore Reader A's
re-registration which will cause data loss.

Thus, when the enumerator receives a split, the split may originate
from three scenarios:

   1. The split is reported by a reader during a global restoration.
   2. The split is reported by a reader during a partial failure recovery.
   3. The split is not reported by a reader, but is assigned after the last
   successful checkpoint and was never acknowledged.

In the first scenario (global restore), the split should be re-distributed.
For the latter two scenarios (partial failover and post-checkpoint
assignment), we need to reassign the split to its originally assigned
subtask.


By implementing a method in the SplitEnumerator context to track each
assigned split's status, the system can correctly identify and resolve
split ownership in all three scenarios.*What about adding a
`SplitRecoveryType splitRecoveryType(Split split)` in
SplitEnumeratorContext.* SplitRecoveryTypeis a enum including
`UNASSIGNED`、`GLOBAL_RESTORE`、`PARTIAL_FAILOVER` and
`POST_CHECKPOINT_ASSIGNMENT`.


What do you think? Are there any details or scenarios I haven't considered?
Looking forward to your advice.


Best,

Hongshun

On Thu, Sep 11, 2025 at 12:41 AM Becket Qin <[email protected]> wrote:

> Thanks for the explanation, Hongshun.
>
> Current pattern of handling new reader registration following:
> 1. put all the reader information in the SplitEnumerator context
> 2. notify the enumerator about the new reader registration.
> 3. Let the split enumerator get whatever information it wants from the
> context and do its job.
> This pattern decouples the information passing and the reader registration
> notification. This makes the API extensible - we can add more information
> (e.g. reported assigned splits in our case) about the reader to the context
> without introducing new methods.
>
> Introducing a new method of addSplitBackOnRecovery() is redundant to the
> above pattern. Do we really need it?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Mon, Sep 8, 2025 at 8:18 PM Hongshun Wang <[email protected]>
> wrote:
>
> > Hi Becket,
> >
> > > I am curious what would the enumerator do differently for the splits
> > added via addSplitsBackOnRecovery() V.S. addSplitsBack()?
> >
> >  In this FLIP, there are two distinct scenarios in which the enumerator
> > receives splits being added back:
> > 1.  Job-level restore: The job is restored,  splits from reader’s state
> are
> > reported by ReaderRegistrationEvent.
> > 2.  Reader-level restart: a reader is started but not the whole  job,
> >  splits assigned to it after the last successful checkpoint. This is what
> > addSplitsBack used to do.
> >
> >
> > In these two situations, the enumerator will choose different strategies.
> > 1. Job-level restore: the splits should be redistributed across readers
> > according to the current partitioner strategy.
> > 2. Reader-level restart: the splits should be reassigned directly back to
> > the same reader they were originally assigned to, preserving locality and
> > avoiding unnecessary redistribution
> >
> > Therefore, the enumerator must clearly distinguish between these two
> > scenarios.I used to deprecate the former  addSplitsBack(List<SplitT>
> > splits, int subtaskId) but add a new addSplitsBack(List<SplitT>
> > splits, int subtaskId,
> > boolean reportedByReader).
> > Leonard suggest to use another method addSplitsBackOnRecovery but not
> > influenced  currently addSplitsBack.
> >
> > Best
> > Hongshun
> >
> >
> >
> > On 2025/09/08 17:20:31 Becket Qin wrote:
> > > Hi Leonard,
> > >
> > >
> > > > Could we introduce a new method like addSplitsBackOnRecovery  with
> > default
> > > > implementation. In this way, we can provide better backward
> > compatibility
> > > > and also makes it easier for developers to understand.
> > >
> > >
> > > I am curious what would the enumerator do differently for the splits
> > added
> > > via addSplitsBackOnRecovery() V.S. addSplitsBack()?  Today,
> > addSplitsBack()
> > > is also only called upon recovery. So the new method seems confusing.
> One
> > > thing worth clarifying is if the Source implements
> > > SupportSplitReassignmentOnRecovery, upon recovery, should the splits
> > > reported by the readers also be added back to the SplitEnumerator via
> the
> > > addSplitsBack() call? Or should the SplitEnumerator explicitly query
> the
> > > registered reader information via the SplitEnumeratorContext to get the
> > > originally assigned splits when addReader() is invoked? I was assuming
> > the
> > > latter in the beginning, so the behavior of addSplitsBack() remains
> > > unchanged, but I am not opposed in doing the former.
> > >
> > > Also, can you elaborate on the backwards compatibility issue you see if
> > we
> > > do not have a separate addSplitsBackOnRecovery() method? Even without
> > this
> > > new method, the behavior remains exactly the same unless the end users
> > > implement the mix-in interface of "SupportSplitReassignmentOnRecovery",
> > > right?
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > > On Mon, Sep 8, 2025 at 1:48 AM Hongshun Wang <[email protected]>
> > > wrote:
> > >
> > > > Hi devs,
> > > >
> > > > It has been quite some time since this FLIP[1] was first proposed.
> > Thank
> > > > you for your valuable feedback—based on your suggestions, the FLIP
> has
> > > > undergone several rounds of revisions.
> > > >
> > > > Any more advice is welcome and appreciated. If there are no further
> > > > concerns, I plan to start the vote tomorrow.
> > > >
> > > > Best
> > > > Hongshun
> > > >
> > > > [1]
> > > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480
> > > >
> > > > On Mon, Sep 8, 2025 at 4:42 PM Hongshun Wang <[email protected]>
> > > > wrote:
> > > >
> > > > > Hi Leonard,
> > > > > Thanks for your advice.  It makes sense and I have modified it.
> > > > >
> > > > > Best,
> > > > > Hongshun
> > > > >
> > > > > On Mon, Sep 8, 2025 at 11:40 AM Leonard Xu <[email protected]>
> wrote:
> > > > >
> > > > >> Thanks Hongshun and Becket for the deep discussion.
> > > > >>
> > > > >> I only have one comment for one API design:
> > > > >>
> > > > >> > Deprecate the old addSplitsBack  method, use a addSplitsBack
> with
> > > > >> param isReportedByReader instead. Because, The enumerator can
> apply
> > > > >> different reassignment policies based on the context.
> > > > >>
> > > > >> Could we introduce a new method like *addSplitsBackOnRecovery*
> with
> > > > default
> > > > >> implementation. In this way, we can provide better backward
> > > > >> compatibility and also makes it easier for developers to
> understand.
> > > > >>
> > > > >> Best,
> > > > >> Leonard
> > > > >>
> > > > >>
> > > > >>
> > > > >> 2025 9月 3 20:26,Hongshun Wang <[email protected]> 写道:
> > > > >>
> > > > >> Hi Becket,
> > > > >>
> > > > >> I think that's a great idea!  I have added the
> > > > >> SupportSplitReassignmentOnRecovery interface in this FLIP. If a
> > Source
> > > > >> implements this interface indicates that the source operator needs
> > to
> > > > >> report splits to the enumerator and receive reassignment.[1]
> > > > >>
> > > > >> Best,
> > > > >> Hongshun
> > > > >>
> > > > >> [1]
> > > > >>
> > > >
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
> > > > >>
> > > > >> On Thu, Aug 21, 2025 at 12:09 PM Becket Qin <[email protected]>
> > > > wrote:
> > > > >>
> > > > >>> Hi Hongshun,
> > > > >>>
> > > > >>> I think the convention for such optional features in Source is
> via
> > > > >>> mix-in interfaces. So instead of adding a method to the
> > SourceReader,
> > > > maybe
> > > > >>> we should introduce an interface
> SupportSplitReassingmentOnRecovery
> > > > with
> > > > >>> this method. If a Source implementation implements that
> interface,
> > > > then the
> > > > >>> SourceOperator will check the desired behavior and act
> accordingly.
> > > > >>>
> > > > >>> Thanks,
> > > > >>>
> > > > >>> Jiangjie (Becket) Qin
> > > > >>>
> > > > >>> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang <
> > [email protected]
> > > > >
> > > > >>> wrote:
> > > > >>>
> > > > >>>> Hi de vs,
> > > > >>>>
> > > > >>>> Would anyone like to discuss this FLIP? I'd appreciate your
> > feedback
> > > > >>>> and suggestions.
> > > > >>>>
> > > > >>>> Best,
> > > > >>>> Hongshun
> > > > >>>>
> > > > >>>>
> > > > >>>> 2025年8月13日 14:23,Hongshun Wang <[email protected]> 写道:
> > > > >>>>
> > > > >>>> Hi Becket,
> > > > >>>>
> > > > >>>> Thank you for your detailed feedback. The new contract makes
> good
> > > > sense
> > > > >>>> to me and effectively addresses the issues I encountered at the
> > > > beginning
> > > > >>>> of the design.
> > > > >>>>
> > > > >>>> That said, I recommend not reporting splits by default,
> primarily
> > for
> > > > >>>> compatibility and practical reasons:
> > > > >>>>
> > > > >>>> >  For these reasons, we do not expect the Split objects to be
> > huge,
> > > > >>>> and we are not trying to design for huge Split objects either as
> > they
> > > > will
> > > > >>>> have problems even today.
> > > > >>>>
> > > > >>>>    1.
> > > > >>>>
> > > > >>>>    Not all existing connector match this rule
> > > > >>>>    For example, in mysql cdc connector, a binlog split may
> contain
> > > > >>>>    hundreds (or even more) snapshot split completion records.
> This
> > > > state is
> > > > >>>>    large and is currently transmitted incrementally through
> > multiple
> > > > >>>>    BinlogSplitMetaEvent messages. Since the binlog reader
> operates
> > > > >>>>    with single parallelism, reporting the full split state on
> > recovery
> > > > >>>>    could be inefficient or even infeasible.
> > > > >>>>    For such sources, it would be better to provide a mechanism
> to
> > skip
> > > > >>>>    split reporting during restart until they redesign and reduce
> > the
> > > > >>>>    split size.
> > > > >>>>    2.
> > > > >>>>
> > > > >>>>    Not all enumerators maintain unassigned splits in state.
> > > > >>>>    Some SplitEnumerator(such as kafka connector) implementations
> > do
> > > > >>>>    not track or persistently manage unassigned splits. Requiring
> > them
> > > > to
> > > > >>>>    handle re-registration would add unnecessary complexity. Even
> > > > though we
> > > > >>>>    maybe implements in kafka connector, currently, kafka
> connector
> > is
> > > > decouple
> > > > >>>>    with flink version, we also need to make sure the elder
> version
> > is
> > > > >>>>    compatible.
> > > > >>>>
> > > > >>>> ------------------------------
> > > > >>>>
> > > > >>>> To address these concerns, I propose introducing a new method:
> > boolean
> > > > >>>> SourceReader#shouldReassignSplitsOnRecovery() with a default
> > > > >>>> implementation returning false. This allows source readers to
> opt
> > in
> > > > >>>> to split reassignment only when necessary. Since the new
> contract
> > > > already
> > > > >>>> places the responsibility for split assignment on the
> enumerator,
> > not
> > > > >>>> reporting splits by default is a safe and clean default
> behavior.
> > > > >>>>
> > > > >>>>
> > > > >>>> ------------------------------
> > > > >>>>
> > > > >>>>
> > > > >>>> I’ve updated the implementation and the FIP accordingly[1]. It
> > quite a
> > > > >>>> big change. In particular, for the Kafka connector, we can now
> use
> > a
> > > > >>>> pluggable SplitPartitioner to support different split assignment
> > > > >>>> strategies (e.g., default, round-robin).
> > > > >>>>
> > > > >>>>
> > > > >>>> Could you please review it when you have a chance?
> > > > >>>>
> > > > >>>>
> > > > >>>> Best,
> > > > >>>>
> > > > >>>> Hongshun
> > > > >>>>
> > > > >>>>
> > > > >>>> [1]
> > > > >>>>
> > > >
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
> > > > >>>>
> > > > >>>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <[email protected]>
> > > > wrote:
> > > > >>>>
> > > > >>>>> Hi Hongshun,
> > > > >>>>>
> > > > >>>>> I am not too concerned about the transmission cost. Because the
> > full
> > > > >>>>> split transmission has to happen in the initial assignment
> phase
> > > > already.
> > > > >>>>> And in the future, we probably want to also introduce some kind
> > of
> > > > workload
> > > > >>>>> balance across source readers, e.g. based on the per-split
> > > > throughput or
> > > > >>>>> the per-source-reader workload in heterogeneous clusters. For
> > these
> > > > >>>>> reasons, we do not expect the Split objects to be huge, and we
> > are
> > > > not
> > > > >>>>> trying to design for huge Split objects either as they will
> have
> > > > problems
> > > > >>>>> even today.
> > > > >>>>>
> > > > >>>>> Good point on the potential split loss, please see the reply
> > below:
> > > > >>>>>
> > > > >>>>> Scenario 2:
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>> 1. Reader A reports splits (1 and 2), and Reader B reports (3
> > and 4)
> > > > >>>>>> upon restart.
> > > > >>>>>> 2. Before the enumerator receives all reports and performs
> > > > >>>>>> reassignment, a checkpoint is triggered.
> > > > >>>>>> 3. Since no splits have been reassigned yet, both readers have
> > empty
> > > > >>>>>> states.
> > > > >>>>>> 4. When restarting from this checkpoint, all four splits are
> > lost.
> > > > >>>>>
> > > > >>>>> The reader registration happens in the SourceOperator.open(),
> > which
> > > > >>>>> means the task is still in the initializing state, therefore
> the
> > > > checkpoint
> > > > >>>>> should not be triggered until the enumerator receives all the
> > split
> > > > reports.
> > > > >>>>>
> > > > >>>>> There is a nuance here. Today, the RPC call from the TM to the
> JM
> > is
> > > > >>>>> async. So it is possible that the SourceOpertor.open() has
> > returned,
> > > > but
> > > > >>>>> the enumerator has not received the split reports. However,
> > because
> > > > the
> > > > >>>>> task status update RPC call goes to the same channel as the
> split
> > > > reports
> > > > >>>>> call, so the task status RPC call will happen after the split
> > > > reports call
> > > > >>>>> on the JM side. Therefore, on the JM side, the
> SourceCoordinator
> > will
> > > > >>>>> always first receive the split reports, then receive the
> > checkpoint
> > > > request.
> > > > >>>>> This "happen before" relationship is kind of important to
> > guarantee
> > > > >>>>> the consistent state between enumerator and readers.
> > > > >>>>>
> > > > >>>>> Scenario 1:
> > > > >>>>>
> > > > >>>>>
> > > > >>>>>> 1. Upon restart, Reader A reports assigned splits (1 and 2),
> and
> > > > >>>>>> Reader B reports (3 and 4).
> > > > >>>>>> 2. The enumerator receives these reports but only reassigns
> > splits 1
> > > > >>>>>> and 2 — not 3 and 4.
> > > > >>>>>> 3. A checkpoint or savepoint is then triggered. Only splits 1
> > and 2
> > > > >>>>>> are recorded in the reader states; splits 3 and 4 are not
> > persisted.
> > > > >>>>>> 4. If the job is later restarted from this checkpoint, splits
> 3
> > and
> > > > 4
> > > > >>>>>> will be permanently lost.
> > > > >>>>>
> > > > >>>>> This scenario is possible. One solution is to let the
> enumerator
> > > > >>>>> implementation handle this. That means if the enumerator relies
> > on
> > > > the
> > > > >>>>> initial split reports from the source readers, it should
> maintain
> > > > these
> > > > >>>>> reports by itself. In the above example, the enumerator will
> need
> > to
> > > > >>>>> remember that 3 and 4 are not assigned and put it into its own
> > state.
> > > > >>>>> The current contract is that anything assigned to the
> > SourceReaders
> > > > >>>>> are completely owned by the SourceReaders. Enumerators can
> > remember
> > > > the
> > > > >>>>> assignments but cannot change them, even when the source reader
> > > > recovers /
> > > > >>>>> restarts.
> > > > >>>>> With this FLIP, the contract becomes that the source readers
> will
> > > > >>>>> return the ownership of the splits to the enumerator. So the
> > > > enumerator is
> > > > >>>>> responsible for maintaining these splits, until they are
> assigned
> > to
> > > > a
> > > > >>>>> source reader again.
> > > > >>>>>
> > > > >>>>> There are other cases where there may be conflict information
> > between
> > > > >>>>> reader and enumerator. For example, consider the following
> > sequence:
> > > > >>>>> 1. reader A reports splits (1 and 2) up on restart.
> > > > >>>>> 2. enumerator receives the report and assigns both 1 and 2 to
> > reader
> > > > B.
> > > > >>>>> 3. reader A failed before checkpointing. And this is a partial
> > > > >>>>> failure, so only reader A restarts.
> > > > >>>>> 4. When reader A recovers, it will again report splits (1 and
> 2)
> > to
> > > > >>>>> the enumerator.
> > > > >>>>> 5. The enumerator should ignore this report because it has
> > > > >>>>> assigned splits (1 and 2) to reader B.
> > > > >>>>>
> > > > >>>>> So with the new contract, the enumerator should be the source
> of
> > > > truth
> > > > >>>>> for split ownership.
> > > > >>>>>
> > > > >>>>> Thanks,
> > > > >>>>>
> > > > >>>>> Jiangjie (Becket) Qin
> > > > >>>>>
> > > > >>>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang <
> > > > [email protected]>
> > > > >>>>> wrote:
> > > > >>>>>
> > > > >>>>>> Hi Becket,
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> I did consider this approach at the beginning (and it was also
> > > > >>>>>> mentioned in this FLIP), since it would allow more flexibility
> > in
> > > > >>>>>> reassigning all splits. However, there are a few potential
> > issues.
> > > > >>>>>>
> > > > >>>>>> 1. High Transmission Cost
> > > > >>>>>> If we pass the full split objects (rather than just split
> IDs),
> > the
> > > > >>>>>> data size could be significant, leading to high overhead
> during
> > > > >>>>>> transmission — especially when many splits are involved.
> > > > >>>>>>
> > > > >>>>>> 2. Risk of Split Loss
> > > > >>>>>>
> > > > >>>>>> Risk of split loss exists unless we have a mechanism to make
> > sure
> > > > >>>>>> only can checkpoint after all the splits are reassigned.
> > > > >>>>>> There are scenarios where splits could be lost due to
> > inconsistent
> > > > >>>>>> state handling during recovery:
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> Scenario 1:
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>>    1. Upon restart, Reader A reports assigned splits (1 and
> 2),
> > and
> > > > >>>>>>    Reader B reports (3 and 4).
> > > > >>>>>>    2. The enumerator receives these reports but only reassigns
> > > > >>>>>>    splits 1 and 2 — not 3 and 4.
> > > > >>>>>>    3. A checkpoint or savepoint is then triggered. Only
> splits 1
> > and
> > > > >>>>>>    2 are recorded in the reader states; splits 3 and 4 are not
> > > > persisted.
> > > > >>>>>>    4. If the job is later restarted from this checkpoint,
> splits
> > 3
> > > > >>>>>>    and 4 will be permanently lost.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> Scenario 2:
> > > > >>>>>>
> > > > >>>>>>    1. Reader A reports splits (1 and 2), and Reader B reports
> (3
> > and
> > > > >>>>>>    4) upon restart.
> > > > >>>>>>    2. Before the enumerator receives all reports and performs
> > > > >>>>>>    reassignment, a checkpoint is triggered.
> > > > >>>>>>    3. Since no splits have been reassigned yet, both readers
> > have
> > > > >>>>>>    empty states.
> > > > >>>>>>    4. When restarting from this checkpoint, all four splits
> are
> > > > lost.
> > > > >>>>>>
> > > > >>>>>>
> > > > >>>>>> Let me know if you have thoughts on how we might mitigate
> these
> > > > risks!
> > > > >>>>>>
> > > > >>>>>> Best
> > > > >>>>>> Hongshun
> > > > >>>>>>
> > > > >>>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <[email protected]>
> > > > >>>>>> wrote:
> > > > >>>>>>
> > > > >>>>>>> Hi Hongshun,
> > > > >>>>>>>
> > > > >>>>>>> The steps sound reasonable to me in general. In terms of the
> > > > updated
> > > > >>>>>>> FLIP wiki, it would be good to see if we can keep the
> protocol
> > > > simple. One
> > > > >>>>>>> alternative way to achieve this behavior is following:
> > > > >>>>>>>
> > > > >>>>>>> 1. Upon SourceOperator startup, the SourceOperator sends
> > > > >>>>>>> ReaderRegistrationEvent with the currently assigned splits to
> > the
> > > > >>>>>>> enumerator. It does not add these splits to the SourceReader.
> > > > >>>>>>> 2. The enumerator will always use the
> > > > >>>>>>> SourceEnumeratorContext.assignSplits() to assign the splits.
> > (not
> > > > via the
> > > > >>>>>>> response of the SourceRegistrationEvent, this allows async
> > split
> > > > assignment
> > > > >>>>>>> in case the enumerator wants to wait until all the readers
> are
> > > > registered)
> > > > >>>>>>> 3. The SourceOperator will only call SourceReader.addSplits()
> > when
> > > > >>>>>>> it receives the AddSplitEvent from the enumerator.
> > > > >>>>>>>
> > > > >>>>>>> This protocol has a few benefits:
> > > > >>>>>>> 1. it basically allows arbitrary split reassignment upon
> > restart
> > > > >>>>>>> 2. simplicity: there is only one way to assign splits.
> > > > >>>>>>>
> > > > >>>>>>> So we only need one interface change:
> > > > >>>>>>> - add the initially assigned splits to ReaderInfo so the
> > Enumerator
> > > > >>>>>>> can access it.
> > > > >>>>>>> and one behavior change:
> > > > >>>>>>> - The SourceOperator should stop assigning splits to the from
> > state
> > > > >>>>>>> restoration, but
> > [message truncated...]
> >
>

Reply via email to