Hi devs,
If there are no further suggestions, I will start the voting tomorrow。

Best,
Hongshun

On Fri, Sep 26, 2025 at 7:48 PM Hongshun Wang <[email protected]>
wrote:

> Hi Becket and Leonard,
>
> I have updated the content of this FLIP. The key point is that:
>
> When the split enumerator receives a split, *these splits must have
> already existed in pendingSplitAssignment or assignedSplitments*.
>
>    - If the split is in pendingSplitAssignments, ignore it.
>    - If the split is in assignedSplitAssignments but has a different
>    taskId, ignore it (this indicates it was already assigned to another
>    task).
>    - If the split is in assignedSplitAssignments and shares the same
>    taskId, move the assignment from assignedSplitments to 
> pendingSplitAssignment
>    to re-assign again.
>
>
> For better understanding why use these strategies. I added some examples
> and pictures to show it.
>
> Would you like to help me check whether there are still some problems?
>
> Best
> Hongshun
>
>
>
> On Fri, Sep 26, 2025 at 5:08 PM Leonard Xu <[email protected]> wrote:
>
>> Thanks Becket and Hongshun for the insightful discussion.
>>
>> The underlying implementation and communication mechanisms of Flink
>> Source indeed involve many intricate details, we discussed the issue of
>> splits re-assignment in specific scenarios, but fortunately, the final
>> decision turned out to be pretty clear.
>>
>>  +1 to Becket’s proposal to keeps the framework cleaner and more
>> flexible.
>> +1 to Hongshun’s point to provide comprehensive guidance for connector
>> developers.
>>
>>
>> Best,
>> Leonard
>>
>>
>>
>> 2025 9月 26 16:30,Hongshun Wang <[email protected]> 写道:
>>
>> Hi Becket,
>>
>> I Got it. You’re suggesting we should not handle this in the source
>> framework but instead let the split enumerator manage these three scenarios.
>>
>> Let me explain why I originally favored handling it in the framework: I'm
>> concerned that connector developers might overlook certain edge cases
>> (after all, we even payed extensive discussions to fully clarify the logic)
>>
>> However, your point keeps the framework cleaner and more flexible. Thus,
>> I will take it.
>>
>> Perhaps, in this FLIP, we should focus on providing comprehensive
>> guidance for connector developers: explain how to implement a split
>> enumerator, including the underlying challenges and their solutions.
>>
>>
>> Additionally, we can use the Kafka connector as a reference
>> implementation to demonstrate the practical steps. This way, developers who
>> want to implement similar connectors can directly reference this example.
>>
>>
>> Best,
>> Hongshun
>>
>>
>>
>> On Fri, Sep 26, 2025 at 1:27 PM Becket Qin <[email protected]> wrote:
>>
>>> It would be good to not expose runtime details to the source
>>> implementation if possible.
>>>
>>> Today, the split enumerator implementations are expected to track the
>>> split assignment.
>>>
>>> Assuming the split enumerator implementation keeps a split assignment
>>> map, that means the enumerator should already know whether a split is
>>> assigned or unassigned. So it can handle the three scenarios you mentioned.
>>>
>>> The split is reported by a reader during a global restoration.
>>>>
>>> The split enumerator should have just been restored / created. If the
>>> enumerator expects a full reassignment of splits up on global recovery,
>>> there should be no assigned splits to that reader in the split assignment
>>> mapping.
>>>
>>> The split is reported by a reader during a partial failure recovery.
>>>>
>>> In this case, when SplitEnumerator.addReader() is invoked, the split
>>> assignment map in the enumerator implementation should already have some
>>> split assignments for the reader. Therefore it is a partial failover. If
>>> the source supports split reassignment on recovery, the enumerator can
>>> assign splits that are different from the reported assignment of that
>>> reader in the SplitEnumeratorContext, or it can also assign the same
>>> splits. In any case, the enumerator knows that this is a partial recovery
>>> because the assignment map is non-empty.
>>>
>>> The split is not reported by a reader, but is assigned after the last
>>>> successful checkpoint and was never acknowledged.
>>>
>>> This is actually one of the step in the partial failure recover.
>>> SplitEnumerator.addSplitsBack() will be called first before
>>> SplitReader.addReader() is called for the recovered reader. When the
>>> SplitEnumerator.addSplitsBack() is invoked, it is for sure a partial
>>> recovery. And the enumerator should remove these splits from the split
>>> assignment map as if they were never assigned.
>>>
>>> I think this should work, right?
>>>
>>> Thanks,
>>>
>>> Jiangjie (Becket) Qin
>>>
>>> On Thu, Sep 25, 2025 at 8:34 PM Hongshun Wang <[email protected]>
>>> wrote:
>>>
>>>> Hi Becket and Leonard,
>>>>
>>>> Thanks for your advice.
>>>>
>>>> > put all the reader information in the SplitEnumerator context
>>>> I have a concern: the current registeredReaders in*
>>>> SourceCoordinatorContext will be removed after subtaskResetor execution on
>>>> failure*.However, this approach has merit.
>>>>
>>>> One more situation I found my previous design does not cover:
>>>>
>>>>    1. Initial state: Reader A reports splits (1, 2).
>>>>    2. Enumerator action: Assigns split 1 to Reader A, and split 2 to
>>>>    Reader B.
>>>>    3. Failure scenario: Reader A fails before checkpointing. Since
>>>>    this is a partial failure, only Reader A restarts.
>>>>    4. Recovery issue: Upon recovery, Reader A re-reports split (1).
>>>>
>>>> In my previous design, the enumerator will ignore Reader A's
>>>> re-registration which will cause data loss.
>>>>
>>>> Thus, when the enumerator receives a split, the split may originate
>>>> from three scenarios:
>>>>
>>>>    1. The split is reported by a reader during a global restoration.
>>>>    2. The split is reported by a reader during a partial failure
>>>>    recovery.
>>>>    3. The split is not reported by a reader, but is assigned after the
>>>>    last successful checkpoint and was never acknowledged.
>>>>
>>>> In the first scenario (global restore), the split should
>>>> be re-distributed. For the latter two scenarios (partial failover and
>>>> post-checkpoint assignment), we need to reassign the split to
>>>> its originally assigned subtask.
>>>>
>>>> By implementing a method in the SplitEnumerator context to track each
>>>> assigned split's status, the system can correctly identify and resolve
>>>> split ownership in all three scenarios.*What about adding a
>>>> `SplitRecoveryType splitRecoveryType(Split split)` in
>>>> SplitEnumeratorContext.* SplitRecoveryTypeis a enum including
>>>> `UNASSIGNED`、`GLOBAL_RESTORE`、`PARTIAL_FAILOVER` and
>>>> `POST_CHECKPOINT_ASSIGNMENT`.
>>>>
>>>> What do you think? Are there any details or scenarios I haven't
>>>> considered? Looking forward to your advice.
>>>>
>>>> Best,
>>>> Hongshun
>>>>
>>>> On Thu, Sep 11, 2025 at 12:41 AM Becket Qin <[email protected]>
>>>> wrote:
>>>>
>>>>> Thanks for the explanation, Hongshun.
>>>>>
>>>>> Current pattern of handling new reader registration following:
>>>>> 1. put all the reader information in the SplitEnumerator context
>>>>> 2. notify the enumerator about the new reader registration.
>>>>> 3. Let the split enumerator get whatever information it wants from the
>>>>> context and do its job.
>>>>> This pattern decouples the information passing and the reader
>>>>> registration
>>>>> notification. This makes the API extensible - we can add more
>>>>> information
>>>>> (e.g. reported assigned splits in our case) about the reader to the
>>>>> context
>>>>> without introducing new methods.
>>>>>
>>>>> Introducing a new method of addSplitBackOnRecovery() is redundant to
>>>>> the
>>>>> above pattern. Do we really need it?
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Jiangjie (Becket) Qin
>>>>>
>>>>> On Mon, Sep 8, 2025 at 8:18 PM Hongshun Wang <[email protected]>
>>>>> wrote:
>>>>>
>>>>> > Hi Becket,
>>>>> >
>>>>> > > I am curious what would the enumerator do differently for the
>>>>> splits
>>>>> > added via addSplitsBackOnRecovery() V.S. addSplitsBack()?
>>>>> >
>>>>> >  In this FLIP, there are two distinct scenarios in which the
>>>>> enumerator
>>>>> > receives splits being added back:
>>>>> > 1.  Job-level restore: The job is restored,  splits from reader’s
>>>>> state are
>>>>> > reported by ReaderRegistrationEvent.
>>>>> > 2.  Reader-level restart: a reader is started but not the whole  job,
>>>>> >  splits assigned to it after the last successful checkpoint. This is
>>>>> what
>>>>> > addSplitsBack used to do.
>>>>> >
>>>>> >
>>>>> > In these two situations, the enumerator will choose different
>>>>> strategies.
>>>>> > 1. Job-level restore: the splits should be redistributed across
>>>>> readers
>>>>> > according to the current partitioner strategy.
>>>>> > 2. Reader-level restart: the splits should be reassigned directly
>>>>> back to
>>>>> > the same reader they were originally assigned to, preserving
>>>>> locality and
>>>>> > avoiding unnecessary redistribution
>>>>> >
>>>>> > Therefore, the enumerator must clearly distinguish between these two
>>>>> > scenarios.I used to deprecate the former  addSplitsBack(List<SplitT>
>>>>> > splits, int subtaskId) but add a new addSplitsBack(List<SplitT>
>>>>> > splits, int subtaskId,
>>>>> > boolean reportedByReader).
>>>>> > Leonard suggest to use another method addSplitsBackOnRecovery but not
>>>>> > influenced  currently addSplitsBack.
>>>>> >
>>>>> > Best
>>>>> > Hongshun
>>>>> >
>>>>> >
>>>>> >
>>>>> > On 2025/09/08 17:20:31 Becket Qin wrote:
>>>>> > > Hi Leonard,
>>>>> > >
>>>>> > >
>>>>> > > > Could we introduce a new method like addSplitsBackOnRecovery
>>>>> with
>>>>> > default
>>>>> > > > implementation. In this way, we can provide better backward
>>>>> > compatibility
>>>>> > > > and also makes it easier for developers to understand.
>>>>> > >
>>>>> > >
>>>>> > > I am curious what would the enumerator do differently for the
>>>>> splits
>>>>> > added
>>>>> > > via addSplitsBackOnRecovery() V.S. addSplitsBack()?  Today,
>>>>> > addSplitsBack()
>>>>> > > is also only called upon recovery. So the new method seems
>>>>> confusing. One
>>>>> > > thing worth clarifying is if the Source implements
>>>>> > > SupportSplitReassignmentOnRecovery, upon recovery, should the
>>>>> splits
>>>>> > > reported by the readers also be added back to the SplitEnumerator
>>>>> via the
>>>>> > > addSplitsBack() call? Or should the SplitEnumerator explicitly
>>>>> query the
>>>>> > > registered reader information via the SplitEnumeratorContext to
>>>>> get the
>>>>> > > originally assigned splits when addReader() is invoked? I was
>>>>> assuming
>>>>> > the
>>>>> > > latter in the beginning, so the behavior of addSplitsBack() remains
>>>>> > > unchanged, but I am not opposed in doing the former.
>>>>> > >
>>>>> > > Also, can you elaborate on the backwards compatibility issue you
>>>>> see if
>>>>> > we
>>>>> > > do not have a separate addSplitsBackOnRecovery() method? Even
>>>>> without
>>>>> > this
>>>>> > > new method, the behavior remains exactly the same unless the end
>>>>> users
>>>>> > > implement the mix-in interface of
>>>>> "SupportSplitReassignmentOnRecovery",
>>>>> > > right?
>>>>> > >
>>>>> > > Thanks,
>>>>> > >
>>>>> > > Jiangjie (Becket) Qin
>>>>> > >
>>>>> > > On Mon, Sep 8, 2025 at 1:48 AM Hongshun Wang <[email protected]>
>>>>> > > wrote:
>>>>> > >
>>>>> > > > Hi devs,
>>>>> > > >
>>>>> > > > It has been quite some time since this FLIP[1] was first
>>>>> proposed.
>>>>> > Thank
>>>>> > > > you for your valuable feedback—based on your suggestions, the
>>>>> FLIP has
>>>>> > > > undergone several rounds of revisions.
>>>>> > > >
>>>>> > > > Any more advice is welcome and appreciated. If there are no
>>>>> further
>>>>> > > > concerns, I plan to start the vote tomorrow.
>>>>> > > >
>>>>> > > > Best
>>>>> > > > Hongshun
>>>>> > > >
>>>>> > > > [1]
>>>>> > > >
>>>>> >
>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=373886480
>>>>> > > >
>>>>> > > > On Mon, Sep 8, 2025 at 4:42 PM Hongshun Wang <[email protected]>
>>>>> > > > wrote:
>>>>> > > >
>>>>> > > > > Hi Leonard,
>>>>> > > > > Thanks for your advice.  It makes sense and I have modified it.
>>>>> > > > >
>>>>> > > > > Best,
>>>>> > > > > Hongshun
>>>>> > > > >
>>>>> > > > > On Mon, Sep 8, 2025 at 11:40 AM Leonard Xu <[email protected]>
>>>>> wrote:
>>>>> > > > >
>>>>> > > > >> Thanks Hongshun and Becket for the deep discussion.
>>>>> > > > >>
>>>>> > > > >> I only have one comment for one API design:
>>>>> > > > >>
>>>>> > > > >> > Deprecate the old addSplitsBack  method, use a
>>>>> addSplitsBack with
>>>>> > > > >> param isReportedByReader instead. Because, The enumerator can
>>>>> apply
>>>>> > > > >> different reassignment policies based on the context.
>>>>> > > > >>
>>>>> > > > >> Could we introduce a new method like
>>>>> *addSplitsBackOnRecovery*  with
>>>>> > > > default
>>>>> > > > >> implementation. In this way, we can provide better backward
>>>>> > > > >> compatibility and also makes it easier for developers to
>>>>> understand.
>>>>> > > > >>
>>>>> > > > >> Best,
>>>>> > > > >> Leonard
>>>>> > > > >>
>>>>> > > > >>
>>>>> > > > >>
>>>>> > > > >> 2025 9月 3 20:26,Hongshun Wang <[email protected]> 写道:
>>>>> > > > >>
>>>>> > > > >> Hi Becket,
>>>>> > > > >>
>>>>> > > > >> I think that's a great idea!  I have added the
>>>>> > > > >> SupportSplitReassignmentOnRecovery interface in this FLIP. If
>>>>> a
>>>>> > Source
>>>>> > > > >> implements this interface indicates that the source operator
>>>>> needs
>>>>> > to
>>>>> > > > >> report splits to the enumerator and receive reassignment.[1]
>>>>> > > > >>
>>>>> > > > >> Best,
>>>>> > > > >> Hongshun
>>>>> > > > >>
>>>>> > > > >> [1]
>>>>> > > > >>
>>>>> > > >
>>>>> >
>>>>> >
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>>> > > > >>
>>>>> > > > >> On Thu, Aug 21, 2025 at 12:09 PM Becket Qin <[email protected]>
>>>>> > > > wrote:
>>>>> > > > >>
>>>>> > > > >>> Hi Hongshun,
>>>>> > > > >>>
>>>>> > > > >>> I think the convention for such optional features in Source
>>>>> is via
>>>>> > > > >>> mix-in interfaces. So instead of adding a method to the
>>>>> > SourceReader,
>>>>> > > > maybe
>>>>> > > > >>> we should introduce an interface
>>>>> SupportSplitReassingmentOnRecovery
>>>>> > > > with
>>>>> > > > >>> this method. If a Source implementation implements that
>>>>> interface,
>>>>> > > > then the
>>>>> > > > >>> SourceOperator will check the desired behavior and act
>>>>> accordingly.
>>>>> > > > >>>
>>>>> > > > >>> Thanks,
>>>>> > > > >>>
>>>>> > > > >>> Jiangjie (Becket) Qin
>>>>> > > > >>>
>>>>> > > > >>> On Wed, Aug 20, 2025 at 8:52 PM Hongshun Wang <
>>>>> > [email protected]
>>>>> > > > >
>>>>> > > > >>> wrote:
>>>>> > > > >>>
>>>>> > > > >>>> Hi de vs,
>>>>> > > > >>>>
>>>>> > > > >>>> Would anyone like to discuss this FLIP? I'd appreciate your
>>>>> > feedback
>>>>> > > > >>>> and suggestions.
>>>>> > > > >>>>
>>>>> > > > >>>> Best,
>>>>> > > > >>>> Hongshun
>>>>> > > > >>>>
>>>>> > > > >>>>
>>>>> > > > >>>> 2025年8月13日 14:23,Hongshun Wang <[email protected]> 写道:
>>>>> > > > >>>>
>>>>> > > > >>>> Hi Becket,
>>>>> > > > >>>>
>>>>> > > > >>>> Thank you for your detailed feedback. The new contract
>>>>> makes good
>>>>> > > > sense
>>>>> > > > >>>> to me and effectively addresses the issues I encountered at
>>>>> the
>>>>> > > > beginning
>>>>> > > > >>>> of the design.
>>>>> > > > >>>>
>>>>> > > > >>>> That said, I recommend not reporting splits by default,
>>>>> primarily
>>>>> > for
>>>>> > > > >>>> compatibility and practical reasons:
>>>>> > > > >>>>
>>>>> > > > >>>> >  For these reasons, we do not expect the Split objects to
>>>>> be
>>>>> > huge,
>>>>> > > > >>>> and we are not trying to design for huge Split objects
>>>>> either as
>>>>> > they
>>>>> > > > will
>>>>> > > > >>>> have problems even today.
>>>>> > > > >>>>
>>>>> > > > >>>>    1.
>>>>> > > > >>>>
>>>>> > > > >>>>    Not all existing connector match this rule
>>>>> > > > >>>>    For example, in mysql cdc connector, a binlog split may
>>>>> contain
>>>>> > > > >>>>    hundreds (or even more) snapshot split completion
>>>>> records. This
>>>>> > > > state is
>>>>> > > > >>>>    large and is currently transmitted incrementally through
>>>>> > multiple
>>>>> > > > >>>>    BinlogSplitMetaEvent messages. Since the binlog reader
>>>>> operates
>>>>> > > > >>>>    with single parallelism, reporting the full split state
>>>>> on
>>>>> > recovery
>>>>> > > > >>>>    could be inefficient or even infeasible.
>>>>> > > > >>>>    For such sources, it would be better to provide a
>>>>> mechanism to
>>>>> > skip
>>>>> > > > >>>>    split reporting during restart until they redesign and
>>>>> reduce
>>>>> > the
>>>>> > > > >>>>    split size.
>>>>> > > > >>>>    2.
>>>>> > > > >>>>
>>>>> > > > >>>>    Not all enumerators maintain unassigned splits in state.
>>>>> > > > >>>>    Some SplitEnumerator(such as kafka connector)
>>>>> implementations
>>>>> > do
>>>>> > > > >>>>    not track or persistently manage unassigned splits.
>>>>> Requiring
>>>>> > them
>>>>> > > > to
>>>>> > > > >>>>    handle re-registration would add unnecessary complexity.
>>>>> Even
>>>>> > > > though we
>>>>> > > > >>>>    maybe implements in kafka connector, currently, kafka
>>>>> connector
>>>>> > is
>>>>> > > > decouple
>>>>> > > > >>>>    with flink version, we also need to make sure the elder
>>>>> version
>>>>> > is
>>>>> > > > >>>>    compatible.
>>>>> > > > >>>>
>>>>> > > > >>>> ------------------------------
>>>>> > > > >>>>
>>>>> > > > >>>> To address these concerns, I propose introducing a new
>>>>> method:
>>>>> > boolean
>>>>> > > > >>>> SourceReader#shouldReassignSplitsOnRecovery() with a default
>>>>> > > > >>>> implementation returning false. This allows source readers
>>>>> to opt
>>>>> > in
>>>>> > > > >>>> to split reassignment only when necessary. Since the new
>>>>> contract
>>>>> > > > already
>>>>> > > > >>>> places the responsibility for split assignment on the
>>>>> enumerator,
>>>>> > not
>>>>> > > > >>>> reporting splits by default is a safe and clean default
>>>>> behavior.
>>>>> > > > >>>>
>>>>> > > > >>>>
>>>>> > > > >>>> ------------------------------
>>>>> > > > >>>>
>>>>> > > > >>>>
>>>>> > > > >>>> I’ve updated the implementation and the FIP accordingly[1].
>>>>> It
>>>>> > quite a
>>>>> > > > >>>> big change. In particular, for the Kafka connector, we can
>>>>> now use
>>>>> > a
>>>>> > > > >>>> pluggable SplitPartitioner to support different split
>>>>> assignment
>>>>> > > > >>>> strategies (e.g., default, round-robin).
>>>>> > > > >>>>
>>>>> > > > >>>>
>>>>> > > > >>>> Could you please review it when you have a chance?
>>>>> > > > >>>>
>>>>> > > > >>>>
>>>>> > > > >>>> Best,
>>>>> > > > >>>>
>>>>> > > > >>>> Hongshun
>>>>> > > > >>>>
>>>>> > > > >>>>
>>>>> > > > >>>> [1]
>>>>> > > > >>>>
>>>>> > > >
>>>>> >
>>>>> >
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-537%3A+Enumerator+with+Global+Split+Assignment+Distribution+for+Balanced+Split+assignment
>>>>> > > > >>>>
>>>>> > > > >>>> On Sat, Aug 9, 2025 at 3:03 AM Becket Qin <[email protected]>
>>>>> > > > wrote:
>>>>> > > > >>>>
>>>>> > > > >>>>> Hi Hongshun,
>>>>> > > > >>>>>
>>>>> > > > >>>>> I am not too concerned about the transmission cost.
>>>>> Because the
>>>>> > full
>>>>> > > > >>>>> split transmission has to happen in the initial assignment
>>>>> phase
>>>>> > > > already.
>>>>> > > > >>>>> And in the future, we probably want to also introduce some
>>>>> kind
>>>>> > of
>>>>> > > > workload
>>>>> > > > >>>>> balance across source readers, e.g. based on the per-split
>>>>> > > > throughput or
>>>>> > > > >>>>> the per-source-reader workload in heterogeneous clusters.
>>>>> For
>>>>> > these
>>>>> > > > >>>>> reasons, we do not expect the Split objects to be huge,
>>>>> and we
>>>>> > are
>>>>> > > > not
>>>>> > > > >>>>> trying to design for huge Split objects either as they
>>>>> will have
>>>>> > > > problems
>>>>> > > > >>>>> even today.
>>>>> > > > >>>>>
>>>>> > > > >>>>> Good point on the potential split loss, please see the
>>>>> reply
>>>>> > below:
>>>>> > > > >>>>>
>>>>> > > > >>>>> Scenario 2:
>>>>> > > > >>>>>
>>>>> > > > >>>>>
>>>>> > > > >>>>>> 1. Reader A reports splits (1 and 2), and Reader B
>>>>> reports (3
>>>>> > and 4)
>>>>> > > > >>>>>> upon restart.
>>>>> > > > >>>>>> 2. Before the enumerator receives all reports and performs
>>>>> > > > >>>>>> reassignment, a checkpoint is triggered.
>>>>> > > > >>>>>> 3. Since no splits have been reassigned yet, both readers
>>>>> have
>>>>> > empty
>>>>> > > > >>>>>> states.
>>>>> > > > >>>>>> 4. When restarting from this checkpoint, all four splits
>>>>> are
>>>>> > lost.
>>>>> > > > >>>>>
>>>>> > > > >>>>> The reader registration happens in the
>>>>> SourceOperator.open(),
>>>>> > which
>>>>> > > > >>>>> means the task is still in the initializing state,
>>>>> therefore the
>>>>> > > > checkpoint
>>>>> > > > >>>>> should not be triggered until the enumerator receives all
>>>>> the
>>>>> > split
>>>>> > > > reports.
>>>>> > > > >>>>>
>>>>> > > > >>>>> There is a nuance here. Today, the RPC call from the TM to
>>>>> the JM
>>>>> > is
>>>>> > > > >>>>> async. So it is possible that the SourceOpertor.open() has
>>>>> > returned,
>>>>> > > > but
>>>>> > > > >>>>> the enumerator has not received the split reports. However,
>>>>> > because
>>>>> > > > the
>>>>> > > > >>>>> task status update RPC call goes to the same channel as
>>>>> the split
>>>>> > > > reports
>>>>> > > > >>>>> call, so the task status RPC call will happen after the
>>>>> split
>>>>> > > > reports call
>>>>> > > > >>>>> on the JM side. Therefore, on the JM side, the
>>>>> SourceCoordinator
>>>>> > will
>>>>> > > > >>>>> always first receive the split reports, then receive the
>>>>> > checkpoint
>>>>> > > > request.
>>>>> > > > >>>>> This "happen before" relationship is kind of important to
>>>>> > guarantee
>>>>> > > > >>>>> the consistent state between enumerator and readers.
>>>>> > > > >>>>>
>>>>> > > > >>>>> Scenario 1:
>>>>> > > > >>>>>
>>>>> > > > >>>>>
>>>>> > > > >>>>>> 1. Upon restart, Reader A reports assigned splits (1 and
>>>>> 2), and
>>>>> > > > >>>>>> Reader B reports (3 and 4).
>>>>> > > > >>>>>> 2. The enumerator receives these reports but only
>>>>> reassigns
>>>>> > splits 1
>>>>> > > > >>>>>> and 2 — not 3 and 4.
>>>>> > > > >>>>>> 3. A checkpoint or savepoint is then triggered. Only
>>>>> splits 1
>>>>> > and 2
>>>>> > > > >>>>>> are recorded in the reader states; splits 3 and 4 are not
>>>>> > persisted.
>>>>> > > > >>>>>> 4. If the job is later restarted from this checkpoint,
>>>>> splits 3
>>>>> > and
>>>>> > > > 4
>>>>> > > > >>>>>> will be permanently lost.
>>>>> > > > >>>>>
>>>>> > > > >>>>> This scenario is possible. One solution is to let the
>>>>> enumerator
>>>>> > > > >>>>> implementation handle this. That means if the enumerator
>>>>> relies
>>>>> > on
>>>>> > > > the
>>>>> > > > >>>>> initial split reports from the source readers, it should
>>>>> maintain
>>>>> > > > these
>>>>> > > > >>>>> reports by itself. In the above example, the enumerator
>>>>> will need
>>>>> > to
>>>>> > > > >>>>> remember that 3 and 4 are not assigned and put it into its
>>>>> own
>>>>> > state.
>>>>> > > > >>>>> The current contract is that anything assigned to the
>>>>> > SourceReaders
>>>>> > > > >>>>> are completely owned by the SourceReaders. Enumerators can
>>>>> > remember
>>>>> > > > the
>>>>> > > > >>>>> assignments but cannot change them, even when the source
>>>>> reader
>>>>> > > > recovers /
>>>>> > > > >>>>> restarts.
>>>>> > > > >>>>> With this FLIP, the contract becomes that the source
>>>>> readers will
>>>>> > > > >>>>> return the ownership of the splits to the enumerator. So
>>>>> the
>>>>> > > > enumerator is
>>>>> > > > >>>>> responsible for maintaining these splits, until they are
>>>>> assigned
>>>>> > to
>>>>> > > > a
>>>>> > > > >>>>> source reader again.
>>>>> > > > >>>>>
>>>>> > > > >>>>> There are other cases where there may be conflict
>>>>> information
>>>>> > between
>>>>> > > > >>>>> reader and enumerator. For example, consider the following
>>>>> > sequence:
>>>>> > > > >>>>> 1. reader A reports splits (1 and 2) up on restart.
>>>>> > > > >>>>> 2. enumerator receives the report and assigns both 1 and 2
>>>>> to
>>>>> > reader
>>>>> > > > B.
>>>>> > > > >>>>> 3. reader A failed before checkpointing. And this is a
>>>>> partial
>>>>> > > > >>>>> failure, so only reader A restarts.
>>>>> > > > >>>>> 4. When reader A recovers, it will again report splits (1
>>>>> and 2)
>>>>> > to
>>>>> > > > >>>>> the enumerator.
>>>>> > > > >>>>> 5. The enumerator should ignore this report because it has
>>>>> > > > >>>>> assigned splits (1 and 2) to reader B.
>>>>> > > > >>>>>
>>>>> > > > >>>>> So with the new contract, the enumerator should be the
>>>>> source of
>>>>> > > > truth
>>>>> > > > >>>>> for split ownership.
>>>>> > > > >>>>>
>>>>> > > > >>>>> Thanks,
>>>>> > > > >>>>>
>>>>> > > > >>>>> Jiangjie (Becket) Qin
>>>>> > > > >>>>>
>>>>> > > > >>>>> On Fri, Aug 8, 2025 at 12:58 AM Hongshun Wang <
>>>>> > > > [email protected]>
>>>>> > > > >>>>> wrote:
>>>>> > > > >>>>>
>>>>> > > > >>>>>> Hi Becket,
>>>>> > > > >>>>>>
>>>>> > > > >>>>>>
>>>>> > > > >>>>>> I did consider this approach at the beginning (and it was
>>>>> also
>>>>> > > > >>>>>> mentioned in this FLIP), since it would allow more
>>>>> flexibility
>>>>> > in
>>>>> > > > >>>>>> reassigning all splits. However, there are a few potential
>>>>> > issues.
>>>>> > > > >>>>>>
>>>>> > > > >>>>>> 1. High Transmission Cost
>>>>> > > > >>>>>> If we pass the full split objects (rather than just split
>>>>> IDs),
>>>>> > the
>>>>> > > > >>>>>> data size could be significant, leading to high overhead
>>>>> during
>>>>> > > > >>>>>> transmission — especially when many splits are involved.
>>>>> > > > >>>>>>
>>>>> > > > >>>>>> 2. Risk of Split Loss
>>>>> > > > >>>>>>
>>>>> > > > >>>>>> Risk of split loss exists unless we have a mechanism to
>>>>> make
>>>>> > sure
>>>>> > > > >>>>>> only can checkpoint after all the splits are reassigned.
>>>>> > > > >>>>>> There are scenarios where splits could be lost due to
>>>>> > inconsistent
>>>>> > > > >>>>>> state handling during recovery:
>>>>> > > > >>>>>>
>>>>> > > > >>>>>>
>>>>> > > > >>>>>> Scenario 1:
>>>>> > > > >>>>>>
>>>>> > > > >>>>>>
>>>>> > > > >>>>>>    1. Upon restart, Reader A reports assigned splits (1
>>>>> and 2),
>>>>> > and
>>>>> > > > >>>>>>    Reader B reports (3 and 4).
>>>>> > > > >>>>>>    2. The enumerator receives these reports but only
>>>>> reassigns
>>>>> > > > >>>>>>    splits 1 and 2 — not 3 and 4.
>>>>> > > > >>>>>>    3. A checkpoint or savepoint is then triggered. Only
>>>>> splits 1
>>>>> > and
>>>>> > > > >>>>>>    2 are recorded in the reader states; splits 3 and 4
>>>>> are not
>>>>> > > > persisted.
>>>>> > > > >>>>>>    4. If the job is later restarted from this checkpoint,
>>>>> splits
>>>>> > 3
>>>>> > > > >>>>>>    and 4 will be permanently lost.
>>>>> > > > >>>>>>
>>>>> > > > >>>>>>
>>>>> > > > >>>>>> Scenario 2:
>>>>> > > > >>>>>>
>>>>> > > > >>>>>>    1. Reader A reports splits (1 and 2), and Reader B
>>>>> reports (3
>>>>> > and
>>>>> > > > >>>>>>    4) upon restart.
>>>>> > > > >>>>>>    2. Before the enumerator receives all reports and
>>>>> performs
>>>>> > > > >>>>>>    reassignment, a checkpoint is triggered.
>>>>> > > > >>>>>>    3. Since no splits have been reassigned yet, both
>>>>> readers
>>>>> > have
>>>>> > > > >>>>>>    empty states.
>>>>> > > > >>>>>>    4. When restarting from this checkpoint, all four
>>>>> splits are
>>>>> > > > lost.
>>>>> > > > >>>>>>
>>>>> > > > >>>>>>
>>>>> > > > >>>>>> Let me know if you have thoughts on how we might mitigate
>>>>> these
>>>>> > > > risks!
>>>>> > > > >>>>>>
>>>>> > > > >>>>>> Best
>>>>> > > > >>>>>> Hongshun
>>>>> > > > >>>>>>
>>>>> > > > >>>>>> On Fri, Aug 8, 2025 at 1:46 AM Becket Qin <
>>>>> [email protected]>
>>>>> > > > >>>>>> wrote:
>>>>> > > > >>>>>>
>>>>> > > > >>>>>>> Hi Hongshun,
>>>>> > > > >>>>>>>
>>>>> > > > >>>>>>> The steps sound reasonable to me in general. In terms of
>>>>> the
>>>>> > > > updated
>>>>> > > > >>>>>>> FLIP wiki, it would be good to see if we can keep the
>>>>> protocol
>>>>> > > > simple. One
>>>>> > > > >>>>>>> alternative way to achieve this behavior is following:
>>>>> > > > >>>>>>>
>>>>> > > > >>>>>>> 1. Upon SourceOperator startup, the SourceOperator sends
>>>>> > > > >>>>>>> ReaderRegistrationEvent with the currently assigned
>>>>> splits to
>>>>> > the
>>>>> > > > >>>>>>> enumerator. It does not add these splits to the
>>>>> SourceReader.
>>>>> > > > >>>>>>> 2. The enumerator will always use the
>>>>> > > > >>>>>>> SourceEnumeratorContext.assignSplits() to assign the
>>>>> splits.
>>>>> > (not
>>>>> > > > via the
>>>>> > > > >>>>>>> response of the SourceRegistrationEvent, this allows
>>>>> async
>>>>> > split
>>>>> > > > assignment
>>>>> > > > >>>>>>> in case the enumerator wants to wait until all the
>>>>> readers are
>>>>> > > > registered)
>>>>> > > > >>>>>>> 3. The SourceOperator will only call
>>>>> SourceReader.addSplits()
>>>>> > when
>>>>> > > > >>>>>>> it receives the AddSplitEvent from the enumerator.
>>>>> > > > >>>>>>>
>>>>> > > > >>>>>>> This protocol has a few benefits:
>>>>> > > > >>>>>>> 1. it basically allows arbitrary split reassignment upon
>>>>> > restart
>>>>> > > > >>>>>>> 2. simplicity: there is only one way to assign splits.
>>>>> > > > >>>>>>>
>>>>> > > > >>>>>>> So we only need one interface change:
>>>>> > > > >>>>>>> - add the initially assigned splits to ReaderInfo so the
>>>>> > Enumerator
>>>>> > > > >>>>>>> can access it.
>>>>> > > > >>>>>>> and one behavior change:
>>>>> > > > >>>>>>> - The SourceOperator should stop assigning splits to the
>>>>> from
>>>>> > state
>>>>> > > > >>>>>>> restoration, but
>>>>> > [message truncated...]
>>>>> >
>>>>>
>>>>
>>

Reply via email to