Hi Raorao,

Good to see this proposal and +1 for the direction. Sorry for joining the
discussion late. I have seen many constructive suggestions that largely
align with my thoughts, but I still have one concern:

I'm one of the authors of FLIP-306 and I'm not in favor of region-level
merging. IIUC, region-level merge files severely limit the effectiveness of
merging, as merging cannot happen between subtasks. I think it should still
be possible to perform TM-level merge. The only thing we should do is to
keep previous checkpoint files alive when a region checkpoint occurs. This
does not conflict with the current design. It is only necessary to ensure
that the new behavior of checkpoint notifications is compatible with
FLIP-306, or some minor adjustment needed towards the merging manager.

And BTW to @Rui, we still need more work to let FLIP-306 cover channel
state and deprecate FLINK-26803, and I will look into this soon.


Best,
Zakelly

On Tue, Jun 23, 2026 at 2:40 PM 熊饶饶 <[email protected]> wrote:

> Hi Roman, thanks for the follow-up.
>
> 1. FLIP page
>
> Thanks for your reminder, I'll create the proper FLIP page before starting
> [VOTE] thread.
>
> 2. Rescaling for OperatorSubtaskState
>
> `refCheckpointId` is checkpoint-level metadata stored in the completed
> checkpoint — it doesn't participate in state redistribution during
> rescaling. The `TaskStateAssignment` redistributes only the state handles
> to new subtasks, while the `CompletedCheckpointStore` retains the original
> metadata (including `refCheckpointId`) for the cleaner to trace reference
> chains. So rescaling is inherently compatible.
>
> 3. Finished operators
>
> You're right, my previous answer missed the real issue. For bounded source
> jobs (FLIP-147), if the final checkpoint is Regional and a failed Region's
> subtask misses `notifyCheckpointComplete`, their side effects (e.g., Kafka
> transactions) are never committed — that's data loss.
>
> Solution: force a global checkpoint when the job is about to terminate.
> When all sources are exhausted, the JM will mark the next checkpoint as
> mandatory global, requiring all regions to ack. If it fails, the job
> retries rather than terminating with a partial snapshot.
>
> 4. Limitations section
>
> I agree with you, and I'll add a section covering all limitations:
>
> BLOCKING/HYBRID edges between Regions -> Auto-disable at runtime
> FLINK-26803 / FLIP-306 -> Warn (future work: region-level merging)
> NO_CLAIM restore mode -> Warn (require global checkpoint before snapshot
> deletion)
> Changelog state backend ->  Reject job submission
> Finished operators (FLIP-147) -> Force global checkpoint when sources are
> exhausted
>
>
> Thanks again for the review, looking forward to your feedback.
>
> Regards,
> Raorao
>
>
> > 2026年6月19日 22:51,Roman Khachatryan <[email protected]> 写道:
> >
> > Hi, thanks for your replies and sorry for the delay.
> >
> > Most of my questions were answered, but I still have some concerns.
> >
> >> If there are no further concerns by next Monday (June 22), I'll go ahead
> > and start the [VOTE] thread for this FLIP.
> >
> > Isn't the actual FLIP still missing? I only saw Google Document. Do you
> > mind creating a page according to [1]?
> >
> > ----------------------------------------
> >
> >> 3. Checkpoint metadata layout
> >> Regional Checkpoint recombines state from different checkpoint IDs. To
> > track this, we add a refCheckpointId field to OperatorSubtaskState in the
> > metadata, indicating which historical checkpoint a subtask’s state
> > references.
> >
> > Could you explain how do we find the right OperatorSubtaskState -
> > especially in case of rescaling?
> > Does the proposal support rescaling?
> >
> >> 9. Finished operators
> >> The concern is: a finished operator’s final commit notification gets
> > skipped by Regional Checkpoint, and if this checkpoint is the last one,
> the
> > operator never receives it — could this cause data loss?
> >> In practice, the impact is limited:
> >> ● Failed Region tasks are already gone: By the time the Regional
> > Checkpoint completes, tasks in the failed Region have already been
> > restarted (decline) or cancelled (timeout). There is no task left to
> > receive the notification anyway.
> > Checkpoint failure doesn't necessarily cause a restart (especially if
> this
> > is limited to one region). The tasks should still be up and running.
> >
> >> ● maxConsecutiveFailures guarantees a global checkpoint: After reaching
> > the limit, the next checkpoint is forced to be global, ensuring all tasks
> > eventually receive notifyCheckpointComplete. We can’t skip the same
> Region
> > forever.
> > maxConsecutiveFailures might not be reached for the final checkpoint.
> >
> >> ● stop-with-savepoint bypasses Regional Checkpoint: When the user stops
> > the job gracefully, it triggers a full global snapshot, not a Regional
> > Checkpoint. So the final checkpoint is always complete.
> > stop-with-savepoint should be fine, yes.
> >
> > To clarify, my concern is about jobs with bounded sources. In such cases,
> > some subtasks might finish processing but still participate in
> checkpoints.
> > After a successful checkpoint, they are guaranteed to get checkpoint
> > completion notification - so that they can make side effects visible in
> > external systems (commit Kafka transactions).
> > See FLIP-147 [2]
> >
> > However, with the current proposal, the job might complete with some
> > subtasks/regions failing the final checkpoint unless I'm missing
> something.
> > This is essentially data loss.
> > To prevent this, the final checkpoint must always be acked by all
> > subtasks/regions.
> >
> > ----------------------------------------
> >
> > There are quite some limitations in this proposal.
> > Could you add a section describing how each of them is handled?
> > 1. Reject job submission
> > 2. Force all-region checkpoint
> > 3. Warn in documentation
> >
> >> 1. Region independence — BLOCKING/HYBRID edges
> >> You’re right. Our current scope is limited to embarrassingly parallel
> > regions. In typical ETL scenarios, each parallelism maps to an
> independent
> > Region with no edges connecting them.
> >
> >> 5. SharedStateRegistry — how are old states kept alive?
> >> Good question. In the current design, since we only target
> embarrassingly
> > parallel regions, there is typically no keyed state and no incremental
> > state. As a result, the SharedStateRegistry is generally empty (setting
> > aside File Merging and Changelog State for now, discussed on 8.), so
> > keep-alive of files under the shared directory is not a concern.
> >
> >> 8. FLINK-26803 and FLIP-306 compatibility
> >> This is a very important point. Both features essentially merge small
> > files at the job level. As Rui Fan pointed out, if the merging
> granularity
> > is reduced to the Region level, compatibility with Regional Checkpoint
> > should be achievable in theory. I think this can be deferred to future
> work
> > — once FLINK-26803 is consolidated into FLIP-306, we can revisit and
> enable
> > support.
> >
> >> 10. NO_CLAIM mode warning
> >> You’re absolutely right — this is an important reminder. After restoring
> > from a Regional Checkpoint, only a successful global checkpoint
> guarantees
> > independence from the old state. We’ll add a clear user warning in the
> > documentation.
> >
> >> 11. Changelog state backend — not supported
> >> As mentioned earlier, our primary target is embarrassingly parallel
> > regions, which typically have no keyed state and therefore no slow
> > incremental state flush issues. I don’t think we need to support
> Changelog
> > state backend for now.
> >
> > ----------------------------------------
> >
> >> 2. max-consecutive-failures exceeded — what exactly happens?
> >> The current design says “force a global checkpoint.” To clarify the
> > two-tier behavior:
> >> ● Tier 1: When consecutiveRegionalCount >= maxConsecutiveFailures, the
> > next checkpoint is forced to be global.
> >> ● Tier 2: If that forced global checkpoint also fails (any task
> > declines), the checkpoint is aborted normally (not a job failure). The
> > counter is then reset since a global checkpoint was attempted, and the
> next
> > checkpoint cycle can try again.
> >> This avoids cascading into job failure while ensuring we don’t drift
> > indefinitely on historical state.
> >
> > My assumption was that we would not allow this particular failed region
> to
> > fail the checkpoint again.
> > But forcing a global checkpoint works as well.
> >
> >> 6. Checkpoint abort notifications & Local Recovery cleanup — new
> > notification type
> >> This is a very insightful point. Zihao and Gen also raised this in
> > earlier discussions. The current design doesn’t address state cleanup for
> > tasks in failed regions. I agree it’s necessary to introduce a new
> > notification type. For tasks in failed regions, local state cleanup can
> be
> > deferred until the next checkpoint trigger.
> > Ok, this can be some future work.
> >
> >> 7. Task that never acknowledges nor declines — per-region timeouts
> >> This was discussed in the previous thread. Network issues may cause a
> > task to neither ack nor decline in time. In such cases, we treat it as a
> > checkpoint timeout: the affected tasks’ region is marked as failed, and
> the
> > process ultimately falls through to the normalRegional Checkpoint
> > processing logic.
> > Ok, this can be some future work.
> >
> > ----------------------------------------
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65145551#FlinkImprovementProposals-CreateyourOwnFLIP
> >
> > [2]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
> >
> > Regards,
> > Roman
> >
> > Regards,
> > Roman
> >
> >
> > On Wed, Jun 17, 2026 at 9:56 AM 熊饶饶 <[email protected]> wrote:
> >
> >> Hi all,
> >>
> >> Thanks everyone for the valuable feedback. I believe all the points
> raised
> >> above have been addressed (@Roman @Rui Fan). If there are no further
> >> concerns  by next Monday (June 22), I'll go ahead and start the [VOTE]
> >> thread for this FLIP.
> >>
> >> For reference, the earlier related discussion can be found here:
> >> https://lists.apache.org/thread/qpztk0jdpcmhomszjx63l53xv26xnmwf
> >>
> >>
> >> Please feel free to share any additional feedback before then.
> >>
> >> Best Regards,
> >> Raorao
> >>
> >> 2026年5月27日 16:31,熊饶饶 <[email protected]> 写道:
> >>
> >> Hi devs,
> >>
> >> I would like to start a discussion on FLIP-XXX: Independent Checkpoint
> >> Based On Pipeline Region.
> >>
> >> In high-parallelism streaming jobs, a single Task's checkpoint failure
> >> causes the entire global Checkpoint to abort, leading to degraded
> >> checkpoint success rates and wasted compute resources (especially for
> GPU
> >> operators).
> >>
> >> We propose Regional Checkpoint: when some Regions fail to checkpoint,
> the
> >> framework combines the historical state of the failed Regions with the
> >> current state of the healthy Regions to produce a logically complete
> >> Completed Checkpoint — while preserving state consistency. The key
> changes
> >> are:
> >>
> >> 1. Snapshot Collection — Allow partial region failures; combine last
> >> successful state of failed Regions with current state of normal Regions.
> >>
> >> 2. State Correction — New checkpointCoordinatorForRegionFallback
> interface
> >> for OperatorCoordinators to produce consistent snapshots against the
> mixed
> >> view.
> >>
> >> 3. Checkpoint Store — Track ref_checkpoint_id in metadata to prevent
> >> premature cleanup of referenced historical checkpoints.
> >>
> >> The detailed design is described in the FLIP document:
> >>
> >>
> https://docs.google.com/document/d/153r9NjHN9xgFUBdZ8sNX6YjUWTREtDMv5i-JaMdE6NU/edit?usp=sharing
> >>
> >> Looking forward to your feedback!
> >>
> >> Best regards,
> >>
> >> Raorao Xiong
> >>
> >>
> >>
>
>

Reply via email to