Hi Becket and Leonard,

I have updated the content of this FLIP. The key point is that:

When the split enumerator receives a split, *these splits must have already
existed in pendingSplitAssignment or assignedSplitments*.

   - If the split is in pendingSplitAssignments, ignore it.
   - If the split is in assignedSplitAssignments but has a different taskId,
   ignore it (this indicates it was already assigned to another task).
   - If the split is in assignedSplitAssignments and shares the same taskId,
   move the assignment from assignedSplitments to pendingSplitAssignment to
   re-assign again.


For better understanding why use these strategies. I added some examples
and pictures to show it.

Would you like to help me check whether there are still some problems?

Best
Hongshun



On Fri, Sep 26, 2025 at 5:08 PM Leonard Xu <[email protected]> wrote:

> Thanks Becket and Hongshun for the insightful discussion.
>
> The underlying implementation and communication mechanisms of Flink Source
> indeed involve many intricate details, we discussed the issue of splits
> re-assignment in specific scenarios, but fortunately, the final decision
> turned out to be pretty clear.
>
>  +1 to Becket’s proposal to keeps the framework cleaner and more flexible.
> +1 to Hongshun’s point to provide comprehensive guidance for connector
> developers.
>
>
> Best,
> Leonard
>
>
>
> 2025 9月 26 16:30,Hongshun Wang <[email protected]> 写道:
>
> Hi Becket,
>
> I Got it. You’re suggesting we should not handle this in the source
> framework but instead let the split enumerator manage these three scenarios.
>
> Let me explain why I originally favored handling it in the framework: I'm
> concerned that connector developers might overlook certain edge cases
> (after all, we even payed extensive discussions to fully clarify the logic)
>
> However, your point keeps the framework cleaner and more flexible. Thus, I
> will take it.
>
> Perhaps, in this FLIP, we should focus on providing comprehensive
> guidance for connector developers: explain how to implement a split
> enumerator, including the underlying challenges and their solutions.
>
>
> Additionally, we can use the Kafka connector as a reference
> implementation to demonstrate the practical steps. This way, developers who
> want to implement similar connectors can directly reference this example.
>
>
> Best,
> Hongshun
>
>
>
> On Fri, Sep 26, 2025 at 1:27 PM Becket Qin <[email protected]> wrote:
>
>> It would be good to not expose runtime details to the source
>> implementation if possible.
>>
>> Today, the split enumerator implementations are expected to track the
>> split assignment.
>>
>> Assuming the split enumerator implementation keeps a split assignment
>> map, that means the enumerator should already know whether a split is
>> assigned or unassigned. So it can handle the three scenarios you mentioned.
>>
>> The split is reported by a reader during a global restoration.
>>>
>> The split enumerator should have just been restored / created. If the
>> enumerator expects a full reassignment of splits up on global recovery,
>> there should be no assigned splits to that reader in the split assignment
>> mapping.
>>
>> The split is reported by a reader during a partial failure recovery.
>>>
>> In this case, when SplitEnumerator.addReader() is invoked, the split
>> assignment map in the enumerator implementation should already have some
>> split assignments for the reader. Therefore it is a partial failover. If
>> the source supports split reassignment on recovery, the enumerator can
>> assign splits that are different from the reported assignment of that
>> reader in the SplitEnumeratorContext, or it can also assign the same
>> splits. In any case, the enumerator knows that this is a partial recovery
>> because the assignment map is non-empty.
>>
>> The split is not reported by a reader, but is assigned after the last
>>> successful checkpoint and was never acknowledged.
>>
>> This is actually one of the step in the partial failure recover.
>> SplitEnumerator.addSplitsBack() will be called first before
>> SplitReader.addReader() is called for the recovered reader. When the
>> SplitEnumerator.addSplitsBack() is invoked, it is for sure a partial
>> recovery. And the enumerator should remove these splits from the split
>> assignment map as if they were never assigned.
>>
>> I think this should work, right?
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Thu, Sep 25, 2025 at 8:34 PM Hongshun Wang <[email protected]>
>> wrote:
>>
>>> Hi Becket and Leonard,
>>>
>>> Thanks for your advice.
>>>
>>> > put all the reader information in the SplitEnumerator context
>>> I have a concern: the current registeredReaders in*
>>> SourceCoordinatorContext will be removed after subtaskResetor execution on
>>> failure*.However, this approach has merit.
>>>
>>> One more situation I found my previous design does not cover:
>>>
>>>    1. Initial state: Reader A reports splits (1, 2).
>>>    2. Enumerator action: Assigns split 1 to Reader A, and split 2 to
>>>    Reader B.
>>>    3. Failure scenario: Reader A fails before checkpointing. Since this
>>>    is a partial failure, only Reader A restarts.
>>>    4. Recovery issue: Upon recovery, Reader A re-reports split (1).
>>>
>>> In my previous design, the enumerator will ignore Reader A's
>>> re-registration which will cause data loss.
>>>
>>> Thus, when the enumerator receives a split, the split may originate
>>> from three scenarios:
>>>
>>>    1. The split is reported by a reader during a global restoration.
>>>    2. The split is reported by a reader during a partial failure
>>>    recovery.
>>>    3. The split is not reported by a reader, but is assigned after the
>>>    last successful checkpoint and was never acknowledged.
>>>
>>> In the first scenario (global restore), the split should
>>> be re-distributed. For the latter two scenarios (partial failover and
>>> post-checkpoint assignment), we need to reassign the split to
>>> its originally assigned subtask.
>>>
>>> By implementing a method in the SplitEnumerator context to track each
>>> assigned split's status, the system can correctly identify and resolve
>>> split ownership in all three scenarios.*What about adding a
>>> `SplitRecoveryType splitRecoveryType(Split split)` in
>>> SplitEnumeratorContext.* SplitRecoveryTypeis a enum including
>>> `UNASSIGNED`、`GLOBAL_RESTORE`、`PARTIAL_FAILOVER` and
>>> `POST_CHECKPOINT_ASSIGNMENT`.
>>>
>>> What do you think? Are there any details or scenarios I haven't
>>> considered? Looking forward to your advice.
>>>
>>> Best,
>>> Hongshun
>>>
>>> On Thu, Sep 11, 2025 at 12:41 AM Becket Qin <[email protected]>
>>> wrote:
>>>
>>>> Thanks for the explanation, Hongshun.
>>>>
>>>> Current pattern of handling new reader registration following:
>>>> 1. put all the reader information in the SplitEnumerator context
>>>> 2. notify the enumerator about the new reader registration.
>>>> 3. Let the split enumerator get whatever information it wants from the
>>>> context and do its job.
>>>> This pattern decouples the information passing and the reader
>>>> registration
>>>> notification. This makes the API extensible - we can add more
>>>> information
>>>> (e.g. reported assigned splits in our case) about the reader to the
>>>> context
>>>> without introducing new methods.
>>>>
>>>> Introducing a new method of addSplitBackOnRecovery() is redundant to the
>>>> above pattern. Do we really need it?
>>>>
>>>> Thanks,
>>>>
>>>> Jiangjie (Becket) Qin
>>>>
>>>> On Mon, Sep 8, 2025 at 8:18 PM Hongshun Wang <[email protected]>
>>>> wrote:
>>>>
>>>> > Hi Becket,
>>>> >
>>>> > > I am curious what would the enumerator do differently for the splits
>>>> > added via addSplitsBackOnRecovery() V.S. addSplitsBack()?
>>>> >
>>>> >  In this FLIP, there are two distinct scenarios in which the
>>>> enumerator
>>>> > receives splits being added back:
>>>> > 1.  Job-level restore: The job is restored,  splits from reader’s
>>>> state are
>>>> > reported by ReaderRegistrationEvent.
>>>> > 2.  Reader-level restart: a reader is started but not the whole  job,
>>>> >  splits assigned to it after the last successful checkpoint. This is
>>>> what
>>>> > addSplitsBack used to do.
>>>> >
>>>> >
>>>> > In these two situations, the enumerator will choose different
>>>> strategies.
>>>> > 1. Job-level restore: the splits should be redistributed across
>>>> readers
>>>> > according to the current partitioner strategy.
>>>> > 2. Reader-level restart: the splits should be reassigned directly
>>>> back to
>>>> > the same reader they were originally assigned to, preserving locality
>>>> and
>>>> > avoiding unnecessary redistribution
>>>> >
>>>> > Therefore, the enumerator must clearly distinguish between these two
>>>> > scenarios.I used to deprecate the former  addSplitsBack(List<SplitT>
>>>> > splits, int subtaskId) but add a new addSplitsBack(List<SplitT>
>>>> > splits, int subtaskId,
>>>> > boolean reportedByReader).
>>>> > Leonard suggest to use another method addSplitsBackOnRecovery but not
>>>> > influenced  currently addSplitsBack.
>>>> >
>>>> > Best
>>>> > Hongshun
>>>> >
>>>> >
>>>> >
>>>> > On 2025/09/08 17:20:31 Becket Qin wrote:
>>>> > > Hi Leonard,
>>>> > >
>>>> > >
>>>> > > > Could we introduce a new method like addSplitsBackOnRecovery  with
>>>> > default
>>>> > > > implementation. In this way, we can provide better backward
>>>> > compatibility
>>>> > > > and also makes it easier for developers to understand.
>>>> > >
>>>> > >
>>>> > > I am curious what would the enumerator do differently for the splits
>>>> > added
>>>> > > via addSplitsBackOnRecovery() V.S. addSplitsBack()?  Today,
>>>> > addSplitsBack()
>>>> > > is also only called upon recovery. So the new method seems
>>>> confusing. One
>>>> > > thing worth clarifying is if the Source implements
>>>> > > SupportSplitReassignmentOnRecovery, upon recovery, should the splits
>>>> > > reported by the readers also be added back to the SplitEnumerator
>>>> via the
>>>> > > addSplitsBack() call? Or should the SplitEnumerator explicitly
>>>> query the
>>>> > > registered reader information via the SplitEnumeratorContext to get
>>>> the
>>>> > > originally assigned splits when addReader() is invoked? I was
>>>> assuming
>>>> > the
>>>> > > latter in the beginning, so the behavior of addSplitsBack() remains
>>>> > > unchanged, but I am not opposed in doing the former.
>>>> > >
>>>> > > Also, can you elaborate on the backwards compatibility issue you
>>>> see if
>>>> > we
>>>> > > do not have a separate addSplitsBackOnRecovery() method? Even
>>>> without
>>>> > this
>>>> > > new method, the behavior remains exactly the same unless the end
>>>> users
>>>> > > implement the mix-in interface of
>>>> "SupportSplitReassignmentOnRecovery",
>>>> > > right?
>>>> > >
>>>> > > Thanks,
>>>> > >
>>>> > > Jiangjie (Becket) Qin
>>>> > >
>>>> > > On Mon, Sep 8, 2025 at 1:48 AM Hongshun Wang <[email protected]>
>>>> > > wrote:
>>>> > >
>>>> > > > Hi devs,
>>>> > > >
>>>> > > > It has been quite some time since this FLIP[1] was first proposed.
>>>> > Thank
>>>> > > > you for your valuable feedback—based on your suggestions, the
>>>> FLIP has
>>>> > > > undergone several rounds of revisions.
>>>> > > >
>>>> > > > Any more advice is welcome and appreciated. If there are no
>>>> further
>>>> > > > concerns, I plan to start the vote tomorrow.
>>>> > > >
>>>> > > > Best
>>>> > > > Hongshun
>>>> > > >
>>>> > > > [1]
>>>> > > >
>>>> >
>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480
>>>> > > >
>>>> > > > On Mon, Sep 8, 2025 at 4:42 PM Hongshun Wang <[email protected]>
>>>> > > > wrote:
>>>> > > >
>>>> > > > > Hi Leonard,
>>>> > > > > Thanks for your advice.  It makes sense and I have modified it.
>>>> > > > >
>>>> > > > > Best,
>>>> > > > > Hongshun
>>>> > > > >
>>>> > > > > On Mon, Sep 8, 2025 at 11:40 AM Leonard Xu <[email protected]>
>>>> wrote:
>>>> > > > >
>>>> > > > >> Thanks Hongshun and Becket for the deep discussion.
>>>> > > > >>
>>>> > > > >> I only have one comment for one API design:
>>>> > > > >>
>>>> > > > >> > Deprecate the old addSplitsBack  method, use a addSplitsBack
>>>> with
>>>> > > > >> param isReportedByReader instead. Because, The enumerator can
>>>> apply
>>>> > > > >> different reassignment policies based on the context.
>>>> > > > >>
>>>> > > > >> Could we introduce a new method like
>>>> *addSplitsBackOnRecovery*  with
>>>> > > > default
>>>> > > > >> implementation. In this way, we can provide better backward
>>>> > > > >> compatibility and also makes it easier for developers to
>>>> understand.
>>>> > > > >>
>>>> > > > >> Best,
>>>> > > > >> Leonard
>>>> > > > >>
>>>> > > > >>
>>>> > > > >>
>>>> > > > >> 2025 9月 3 20:26,Hongshun Wang <[email protected]> 写道:
>>>> > > > >>
>>>> > > > >> Hi Becket,
>>>> > > > >>
>>>> > > > >> I think that's a great idea!  I have added the
>>>> > > > >> SupportSplitReassignmentOnRecovery interface in this FLIP. If a
>>>> > Source
>>>> > > > >> implements this interface indicates that the source operator
>>>> needs
>>>> > to
>>>> > > > >> report splits to the enumerator and receive reassignment.[1]
>>>> > > > >>
>>>> > > > >> Best,
>>>> > > > >> Hongshun
>>>> > > > >>
>>>> > > > >> [1]
>>>> > > > >>
>>>> > > >
>>>> >
>>>> >
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>> > > > >>
>>>> > > > >> On Thu, Aug 21, 2025 at 12:09 PM Becket Qin <[email protected]>
>>>> > > > wrote:
>>>> > > > >>
>>>> > > > >>> Hi Hongshun,
>>>> > > > >>>
>>>> > > > >>> I think the convention for such optional features in Source
>>>> is via
>>>> > > > >>> mix-in interfaces. So instead of adding a method to the
>>>> > SourceReader,
>>>> > > > maybe
>>>> > > > >>> we should introduce an interface
>>>> SupportSplitReassingmentOnRecovery
>>>> > > > with
>>>> > > > >>> this method. If a Source implementation implements that
>>>> interface,
>>>> > > > then the
>>>> > > > >>> SourceOperator will check the desired behavior and act
>>>> accordingly.
>>>> > > > >>>
>>>> > > > >>> Thanks,
>>>> > > > >>>
>>>> > > > >>> Jiangjie (Becket) Qin
>>>> > > > >>>
>>>> > > > >>> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang <
>>>> > [email protected]
>>>> > > > >
>>>> > > > >>> wrote:
>>>> > > > >>>
>>>> > > > >>>> Hi de vs,
>>>> > > > >>>>
>>>> > > > >>>> Would anyone like to discuss this FLIP? I'd appreciate your
>>>> > feedback
>>>> > > > >>>> and suggestions.
>>>> > > > >>>>
>>>> > > > >>>> Best,
>>>> > > > >>>> Hongshun
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>> 2025年8月13日 14:23,Hongshun Wang <[email protected]> 写道:
>>>> > > > >>>>
>>>> > > > >>>> Hi Becket,
>>>> > > > >>>>
>>>> > > > >>>> Thank you for your detailed feedback. The new contract makes
>>>> good
>>>> > > > sense
>>>> > > > >>>> to me and effectively addresses the issues I encountered at
>>>> the
>>>> > > > beginning
>>>> > > > >>>> of the design.
>>>> > > > >>>>
>>>> > > > >>>> That said, I recommend not reporting splits by default,
>>>> primarily
>>>> > for
>>>> > > > >>>> compatibility and practical reasons:
>>>> > > > >>>>
>>>> > > > >>>> >  For these reasons, we do not expect the Split objects to
>>>> be
>>>> > huge,
>>>> > > > >>>> and we are not trying to design for huge Split objects
>>>> either as
>>>> > they
>>>> > > > will
>>>> > > > >>>> have problems even today.
>>>> > > > >>>>
>>>> > > > >>>>    1.
>>>> > > > >>>>
>>>> > > > >>>>    Not all existing connector match this rule
>>>> > > > >>>>    For example, in mysql cdc connector, a binlog split may
>>>> contain
>>>> > > > >>>>    hundreds (or even more) snapshot split completion
>>>> records. This
>>>> > > > state is
>>>> > > > >>>>    large and is currently transmitted incrementally through
>>>> > multiple
>>>> > > > >>>>    BinlogSplitMetaEvent messages. Since the binlog reader
>>>> operates
>>>> > > > >>>>    with single parallelism, reporting the full split state on
>>>> > recovery
>>>> > > > >>>>    could be inefficient or even infeasible.
>>>> > > > >>>>    For such sources, it would be better to provide a
>>>> mechanism to
>>>> > skip
>>>> > > > >>>>    split reporting during restart until they redesign and
>>>> reduce
>>>> > the
>>>> > > > >>>>    split size.
>>>> > > > >>>>    2.
>>>> > > > >>>>
>>>> > > > >>>>    Not all enumerators maintain unassigned splits in state.
>>>> > > > >>>>    Some SplitEnumerator(such as kafka connector)
>>>> implementations
>>>> > do
>>>> > > > >>>>    not track or persistently manage unassigned splits.
>>>> Requiring
>>>> > them
>>>> > > > to
>>>> > > > >>>>    handle re-registration would add unnecessary complexity.
>>>> Even
>>>> > > > though we
>>>> > > > >>>>    maybe implements in kafka connector, currently, kafka
>>>> connector
>>>> > is
>>>> > > > decouple
>>>> > > > >>>>    with flink version, we also need to make sure the elder
>>>> version
>>>> > is
>>>> > > > >>>>    compatible.
>>>> > > > >>>>
>>>> > > > >>>> ------------------------------
>>>> > > > >>>>
>>>> > > > >>>> To address these concerns, I propose introducing a new
>>>> method:
>>>> > boolean
>>>> > > > >>>> SourceReader#shouldReassignSplitsOnRecovery() with a default
>>>> > > > >>>> implementation returning false. This allows source readers
>>>> to opt
>>>> > in
>>>> > > > >>>> to split reassignment only when necessary. Since the new
>>>> contract
>>>> > > > already
>>>> > > > >>>> places the responsibility for split assignment on the
>>>> enumerator,
>>>> > not
>>>> > > > >>>> reporting splits by default is a safe and clean default
>>>> behavior.
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>> ------------------------------
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>> I’ve updated the implementation and the FIP accordingly[1].
>>>> It
>>>> > quite a
>>>> > > > >>>> big change. In particular, for the Kafka connector, we can
>>>> now use
>>>> > a
>>>> > > > >>>> pluggable SplitPartitioner to support different split
>>>> assignment
>>>> > > > >>>> strategies (e.g., default, round-robin).
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>> Could you please review it when you have a chance?
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>> Best,
>>>> > > > >>>>
>>>> > > > >>>> Hongshun
>>>> > > > >>>>
>>>> > > > >>>>
>>>> > > > >>>> [1]
>>>> > > > >>>>
>>>> > > >
>>>> >
>>>> >
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>> > > > >>>>
>>>> > > > >>>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <[email protected]>
>>>> > > > wrote:
>>>> > > > >>>>
>>>> > > > >>>>> Hi Hongshun,
>>>> > > > >>>>>
>>>> > > > >>>>> I am not too concerned about the transmission cost. Because
>>>> the
>>>> > full
>>>> > > > >>>>> split transmission has to happen in the initial assignment
>>>> phase
>>>> > > > already.
>>>> > > > >>>>> And in the future, we probably want to also introduce some
>>>> kind
>>>> > of
>>>> > > > workload
>>>> > > > >>>>> balance across source readers, e.g. based on the per-split
>>>> > > > throughput or
>>>> > > > >>>>> the per-source-reader workload in heterogeneous clusters.
>>>> For
>>>> > these
>>>> > > > >>>>> reasons, we do not expect the Split objects to be huge, and
>>>> we
>>>> > are
>>>> > > > not
>>>> > > > >>>>> trying to design for huge Split objects either as they will
>>>> have
>>>> > > > problems
>>>> > > > >>>>> even today.
>>>> > > > >>>>>
>>>> > > > >>>>> Good point on the potential split loss, please see the reply
>>>> > below:
>>>> > > > >>>>>
>>>> > > > >>>>> Scenario 2:
>>>> > > > >>>>>
>>>> > > > >>>>>
>>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and Reader B reports
>>>> (3
>>>> > and 4)
>>>> > > > >>>>>> upon restart.
>>>> > > > >>>>>> 2. Before the enumerator receives all reports and performs
>>>> > > > >>>>>> reassignment, a checkpoint is triggered.
>>>> > > > >>>>>> 3. Since no splits have been reassigned yet, both readers
>>>> have
>>>> > empty
>>>> > > > >>>>>> states.
>>>> > > > >>>>>> 4. When restarting from this checkpoint, all four splits
>>>> are
>>>> > lost.
>>>> > > > >>>>>
>>>> > > > >>>>> The reader registration happens in the
>>>> SourceOperator.open(),
>>>> > which
>>>> > > > >>>>> means the task is still in the initializing state,
>>>> therefore the
>>>> > > > checkpoint
>>>> > > > >>>>> should not be triggered until the enumerator receives all
>>>> the
>>>> > split
>>>> > > > reports.
>>>> > > > >>>>>
>>>> > > > >>>>> There is a nuance here. Today, the RPC call from the TM to
>>>> the JM
>>>> > is
>>>> > > > >>>>> async. So it is possible that the SourceOpertor.open() has
>>>> > returned,
>>>> > > > but
>>>> > > > >>>>> the enumerator has not received the split reports. However,
>>>> > because
>>>> > > > the
>>>> > > > >>>>> task status update RPC call goes to the same channel as the
>>>> split
>>>> > > > reports
>>>> > > > >>>>> call, so the task status RPC call will happen after the
>>>> split
>>>> > > > reports call
>>>> > > > >>>>> on the JM side. Therefore, on the JM side, the
>>>> SourceCoordinator
>>>> > will
>>>> > > > >>>>> always first receive the split reports, then receive the
>>>> > checkpoint
>>>> > > > request.
>>>> > > > >>>>> This "happen before" relationship is kind of important to
>>>> > guarantee
>>>> > > > >>>>> the consistent state between enumerator and readers.
>>>> > > > >>>>>
>>>> > > > >>>>> Scenario 1:
>>>> > > > >>>>>
>>>> > > > >>>>>
>>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned splits (1 and
>>>> 2), and
>>>> > > > >>>>>> Reader B reports (3 and 4).
>>>> > > > >>>>>> 2. The enumerator receives these reports but only reassigns
>>>> > splits 1
>>>> > > > >>>>>> and 2 — not 3 and 4.
>>>> > > > >>>>>> 3. A checkpoint or savepoint is then triggered. Only
>>>> splits 1
>>>> > and 2
>>>> > > > >>>>>> are recorded in the reader states; splits 3 and 4 are not
>>>> > persisted.
>>>> > > > >>>>>> 4. If the job is later restarted from this checkpoint,
>>>> splits 3
>>>> > and
>>>> > > > 4
>>>> > > > >>>>>> will be permanently lost.
>>>> > > > >>>>>
>>>> > > > >>>>> This scenario is possible. One solution is to let the
>>>> enumerator
>>>> > > > >>>>> implementation handle this. That means if the enumerator
>>>> relies
>>>> > on
>>>> > > > the
>>>> > > > >>>>> initial split reports from the source readers, it should
>>>> maintain
>>>> > > > these
>>>> > > > >>>>> reports by itself. In the above example, the enumerator
>>>> will need
>>>> > to
>>>> > > > >>>>> remember that 3 and 4 are not assigned and put it into its
>>>> own
>>>> > state.
>>>> > > > >>>>> The current contract is that anything assigned to the
>>>> > SourceReaders
>>>> > > > >>>>> are completely owned by the SourceReaders. Enumerators can
>>>> > remember
>>>> > > > the
>>>> > > > >>>>> assignments but cannot change them, even when the source
>>>> reader
>>>> > > > recovers /
>>>> > > > >>>>> restarts.
>>>> > > > >>>>> With this FLIP, the contract becomes that the source
>>>> readers will
>>>> > > > >>>>> return the ownership of the splits to the enumerator. So the
>>>> > > > enumerator is
>>>> > > > >>>>> responsible for maintaining these splits, until they are
>>>> assigned
>>>> > to
>>>> > > > a
>>>> > > > >>>>> source reader again.
>>>> > > > >>>>>
>>>> > > > >>>>> There are other cases where there may be conflict
>>>> information
>>>> > between
>>>> > > > >>>>> reader and enumerator. For example, consider the following
>>>> > sequence:
>>>> > > > >>>>> 1. reader A reports splits (1 and 2) up on restart.
>>>> > > > >>>>> 2. enumerator receives the report and assigns both 1 and 2
>>>> to
>>>> > reader
>>>> > > > B.
>>>> > > > >>>>> 3. reader A failed before checkpointing. And this is a
>>>> partial
>>>> > > > >>>>> failure, so only reader A restarts.
>>>> > > > >>>>> 4. When reader A recovers, it will again report splits (1
>>>> and 2)
>>>> > to
>>>> > > > >>>>> the enumerator.
>>>> > > > >>>>> 5. The enumerator should ignore this report because it has
>>>> > > > >>>>> assigned splits (1 and 2) to reader B.
>>>> > > > >>>>>
>>>> > > > >>>>> So with the new contract, the enumerator should be the
>>>> source of
>>>> > > > truth
>>>> > > > >>>>> for split ownership.
>>>> > > > >>>>>
>>>> > > > >>>>> Thanks,
>>>> > > > >>>>>
>>>> > > > >>>>> Jiangjie (Becket) Qin
>>>> > > > >>>>>
>>>> > > > >>>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang <
>>>> > > > [email protected]>
>>>> > > > >>>>> wrote:
>>>> > > > >>>>>
>>>> > > > >>>>>> Hi Becket,
>>>> > > > >>>>>>
>>>> > > > >>>>>>
>>>> > > > >>>>>> I did consider this approach at the beginning (and it was
>>>> also
>>>> > > > >>>>>> mentioned in this FLIP), since it would allow more
>>>> flexibility
>>>> > in
>>>> > > > >>>>>> reassigning all splits. However, there are a few potential
>>>> > issues.
>>>> > > > >>>>>>
>>>> > > > >>>>>> 1. High Transmission Cost
>>>> > > > >>>>>> If we pass the full split objects (rather than just split
>>>> IDs),
>>>> > the
>>>> > > > >>>>>> data size could be significant, leading to high overhead
>>>> during
>>>> > > > >>>>>> transmission — especially when many splits are involved.
>>>> > > > >>>>>>
>>>> > > > >>>>>> 2. Risk of Split Loss
>>>> > > > >>>>>>
>>>> > > > >>>>>> Risk of split loss exists unless we have a mechanism to
>>>> make
>>>> > sure
>>>> > > > >>>>>> only can checkpoint after all the splits are reassigned.
>>>> > > > >>>>>> There are scenarios where splits could be lost due to
>>>> > inconsistent
>>>> > > > >>>>>> state handling during recovery:
>>>> > > > >>>>>>
>>>> > > > >>>>>>
>>>> > > > >>>>>> Scenario 1:
>>>> > > > >>>>>>
>>>> > > > >>>>>>
>>>> > > > >>>>>>    1. Upon restart, Reader A reports assigned splits (1
>>>> and 2),
>>>> > and
>>>> > > > >>>>>>    Reader B reports (3 and 4).
>>>> > > > >>>>>>    2. The enumerator receives these reports but only
>>>> reassigns
>>>> > > > >>>>>>    splits 1 and 2 — not 3 and 4.
>>>> > > > >>>>>>    3. A checkpoint or savepoint is then triggered. Only
>>>> splits 1
>>>> > and
>>>> > > > >>>>>>    2 are recorded in the reader states; splits 3 and 4 are
>>>> not
>>>> > > > persisted.
>>>> > > > >>>>>>    4. If the job is later restarted from this checkpoint,
>>>> splits
>>>> > 3
>>>> > > > >>>>>>    and 4 will be permanently lost.
>>>> > > > >>>>>>
>>>> > > > >>>>>>
>>>> > > > >>>>>> Scenario 2:
>>>> > > > >>>>>>
>>>> > > > >>>>>>    1. Reader A reports splits (1 and 2), and Reader B
>>>> reports (3
>>>> > and
>>>> > > > >>>>>>    4) upon restart.
>>>> > > > >>>>>>    2. Before the enumerator receives all reports and
>>>> performs
>>>> > > > >>>>>>    reassignment, a checkpoint is triggered.
>>>> > > > >>>>>>    3. Since no splits have been reassigned yet, both
>>>> readers
>>>> > have
>>>> > > > >>>>>>    empty states.
>>>> > > > >>>>>>    4. When restarting from this checkpoint, all four
>>>> splits are
>>>> > > > lost.
>>>> > > > >>>>>>
>>>> > > > >>>>>>
>>>> > > > >>>>>> Let me know if you have thoughts on how we might mitigate
>>>> these
>>>> > > > risks!
>>>> > > > >>>>>>
>>>> > > > >>>>>> Best
>>>> > > > >>>>>> Hongshun
>>>> > > > >>>>>>
>>>> > > > >>>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <[email protected]
>>>> >
>>>> > > > >>>>>> wrote:
>>>> > > > >>>>>>
>>>> > > > >>>>>>> Hi Hongshun,
>>>> > > > >>>>>>>
>>>> > > > >>>>>>> The steps sound reasonable to me in general. In terms of
>>>> the
>>>> > > > updated
>>>> > > > >>>>>>> FLIP wiki, it would be good to see if we can keep the
>>>> protocol
>>>> > > > simple. One
>>>> > > > >>>>>>> alternative way to achieve this behavior is following:
>>>> > > > >>>>>>>
>>>> > > > >>>>>>> 1. Upon SourceOperator startup, the SourceOperator sends
>>>> > > > >>>>>>> ReaderRegistrationEvent with the currently assigned
>>>> splits to
>>>> > the
>>>> > > > >>>>>>> enumerator. It does not add these splits to the
>>>> SourceReader.
>>>> > > > >>>>>>> 2. The enumerator will always use the
>>>> > > > >>>>>>> SourceEnumeratorContext.assignSplits() to assign the
>>>> splits.
>>>> > (not
>>>> > > > via the
>>>> > > > >>>>>>> response of the SourceRegistrationEvent, this allows async
>>>> > split
>>>> > > > assignment
>>>> > > > >>>>>>> in case the enumerator wants to wait until all the
>>>> readers are
>>>> > > > registered)
>>>> > > > >>>>>>> 3. The SourceOperator will only call
>>>> SourceReader.addSplits()
>>>> > when
>>>> > > > >>>>>>> it receives the AddSplitEvent from the enumerator.
>>>> > > > >>>>>>>
>>>> > > > >>>>>>> This protocol has a few benefits:
>>>> > > > >>>>>>> 1. it basically allows arbitrary split reassignment upon
>>>> > restart
>>>> > > > >>>>>>> 2. simplicity: there is only one way to assign splits.
>>>> > > > >>>>>>>
>>>> > > > >>>>>>> So we only need one interface change:
>>>> > > > >>>>>>> - add the initially assigned splits to ReaderInfo so the
>>>> > Enumerator
>>>> > > > >>>>>>> can access it.
>>>> > > > >>>>>>> and one behavior change:
>>>> > > > >>>>>>> - The SourceOperator should stop assigning splits to the
>>>> from
>>>> > state
>>>> > > > >>>>>>> restoration, but
>>>> > [message truncated...]
>>>> >
>>>>
>>>
>

Reply via email to