Hi Zakelly,

Thanks for the feedback! I completely agree — forcing region-level merging 
would significantly reduce the effectiveness of file merging, and that's not 
the right trade-off.

I think TM-level merging can work correctly with Regional Checkpoint if we 
ensure the FileMergingSnapshotManager properly handles the new checkpoint 
notification semantics. Specifically:

- The FileMergingSnapshotManager already uses a notification-based lifecycle 
(checkpoint complete/abort/subsumed) to manage physical file deletion.
- For Regional Checkpoint, we need to introduce the new notification type we 
discussed earlier (notify partially-completed) so that the merging manager 
knows: this checkpoint is complete but some segments belong to failed regions — 
keep the physical files that contain those segments alive until the referencing 
checkpoints are subsumed.
- This aligns with the existing design and requires only minor adjustments to 
the merging manager's notification handling.

The previous suggestion of region-level merging was overly conservative — glad 
you pointed out the better approach.

Also good to know about the plan to cover channel state in FLIP-306. That will 
simplify the compatibility matrix significantly.

Best Regards,
Raorao

> 2026年6月23日 23:33,Zakelly Lan <[email protected]> 写道:
> 
> Hi Raorao,
> 
> Good to see this proposal and +1 for the direction. Sorry for joining the
> discussion late. I have seen many constructive suggestions that largely
> align with my thoughts, but I still have one concern:
> 
> I'm one of the authors of FLIP-306 and I'm not in favor of region-level
> merging. IIUC, region-level merge files severely limit the effectiveness of
> merging, as merging cannot happen between subtasks. I think it should still
> be possible to perform TM-level merge. The only thing we should do is to
> keep previous checkpoint files alive when a region checkpoint occurs. This
> does not conflict with the current design. It is only necessary to ensure
> that the new behavior of checkpoint notifications is compatible with
> FLIP-306, or some minor adjustment needed towards the merging manager.
> 
> And BTW to @Rui, we still need more work to let FLIP-306 cover channel
> state and deprecate FLINK-26803, and I will look into this soon.
> 
> 
> Best,
> Zakelly
> 
> On Tue, Jun 23, 2026 at 2:40 PM 熊饶饶 <[email protected]> wrote:
> 
>> Hi Roman, thanks for the follow-up.
>> 
>> 1. FLIP page
>> 
>> Thanks for your reminder, I'll create the proper FLIP page before starting
>> [VOTE] thread.
>> 
>> 2. Rescaling for OperatorSubtaskState
>> 
>> `refCheckpointId` is checkpoint-level metadata stored in the completed
>> checkpoint — it doesn't participate in state redistribution during
>> rescaling. The `TaskStateAssignment` redistributes only the state handles
>> to new subtasks, while the `CompletedCheckpointStore` retains the original
>> metadata (including `refCheckpointId`) for the cleaner to trace reference
>> chains. So rescaling is inherently compatible.
>> 
>> 3. Finished operators
>> 
>> You're right, my previous answer missed the real issue. For bounded source
>> jobs (FLIP-147), if the final checkpoint is Regional and a failed Region's
>> subtask misses `notifyCheckpointComplete`, their side effects (e.g., Kafka
>> transactions) are never committed — that's data loss.
>> 
>> Solution: force a global checkpoint when the job is about to terminate.
>> When all sources are exhausted, the JM will mark the next checkpoint as
>> mandatory global, requiring all regions to ack. If it fails, the job
>> retries rather than terminating with a partial snapshot.
>> 
>> 4. Limitations section
>> 
>> I agree with you, and I'll add a section covering all limitations:
>> 
>> BLOCKING/HYBRID edges between Regions -> Auto-disable at runtime
>> FLINK-26803 / FLIP-306 -> Warn (future work: region-level merging)
>> NO_CLAIM restore mode -> Warn (require global checkpoint before snapshot
>> deletion)
>> Changelog state backend ->  Reject job submission
>> Finished operators (FLIP-147) -> Force global checkpoint when sources are
>> exhausted
>> 
>> 
>> Thanks again for the review, looking forward to your feedback.
>> 
>> Regards,
>> Raorao
>> 
>> 
>>> 2026年6月19日 22:51,Roman Khachatryan <[email protected]> 写道:
>>> 
>>> Hi, thanks for your replies and sorry for the delay.
>>> 
>>> Most of my questions were answered, but I still have some concerns.
>>> 
>>>> If there are no further concerns by next Monday (June 22), I'll go ahead
>>> and start the [VOTE] thread for this FLIP.
>>> 
>>> Isn't the actual FLIP still missing? I only saw Google Document. Do you
>>> mind creating a page according to [1]?
>>> 
>>> ----------------------------------------
>>> 
>>>> 3. Checkpoint metadata layout
>>>> Regional Checkpoint recombines state from different checkpoint IDs. To
>>> track this, we add a refCheckpointId field to OperatorSubtaskState in the
>>> metadata, indicating which historical checkpoint a subtask’s state
>>> references.
>>> 
>>> Could you explain how do we find the right OperatorSubtaskState -
>>> especially in case of rescaling?
>>> Does the proposal support rescaling?
>>> 
>>>> 9. Finished operators
>>>> The concern is: a finished operator’s final commit notification gets
>>> skipped by Regional Checkpoint, and if this checkpoint is the last one,
>> the
>>> operator never receives it — could this cause data loss?
>>>> In practice, the impact is limited:
>>>> ● Failed Region tasks are already gone: By the time the Regional
>>> Checkpoint completes, tasks in the failed Region have already been
>>> restarted (decline) or cancelled (timeout). There is no task left to
>>> receive the notification anyway.
>>> Checkpoint failure doesn't necessarily cause a restart (especially if
>> this
>>> is limited to one region). The tasks should still be up and running.
>>> 
>>>> ● maxConsecutiveFailures guarantees a global checkpoint: After reaching
>>> the limit, the next checkpoint is forced to be global, ensuring all tasks
>>> eventually receive notifyCheckpointComplete. We can’t skip the same
>> Region
>>> forever.
>>> maxConsecutiveFailures might not be reached for the final checkpoint.
>>> 
>>>> ● stop-with-savepoint bypasses Regional Checkpoint: When the user stops
>>> the job gracefully, it triggers a full global snapshot, not a Regional
>>> Checkpoint. So the final checkpoint is always complete.
>>> stop-with-savepoint should be fine, yes.
>>> 
>>> To clarify, my concern is about jobs with bounded sources. In such cases,
>>> some subtasks might finish processing but still participate in
>> checkpoints.
>>> After a successful checkpoint, they are guaranteed to get checkpoint
>>> completion notification - so that they can make side effects visible in
>>> external systems (commit Kafka transactions).
>>> See FLIP-147 [2]
>>> 
>>> However, with the current proposal, the job might complete with some
>>> subtasks/regions failing the final checkpoint unless I'm missing
>> something.
>>> This is essentially data loss.
>>> To prevent this, the final checkpoint must always be acked by all
>>> subtasks/regions.
>>> 
>>> ----------------------------------------
>>> 
>>> There are quite some limitations in this proposal.
>>> Could you add a section describing how each of them is handled?
>>> 1. Reject job submission
>>> 2. Force all-region checkpoint
>>> 3. Warn in documentation
>>> 
>>>> 1. Region independence — BLOCKING/HYBRID edges
>>>> You’re right. Our current scope is limited to embarrassingly parallel
>>> regions. In typical ETL scenarios, each parallelism maps to an
>> independent
>>> Region with no edges connecting them.
>>> 
>>>> 5. SharedStateRegistry — how are old states kept alive?
>>>> Good question. In the current design, since we only target
>> embarrassingly
>>> parallel regions, there is typically no keyed state and no incremental
>>> state. As a result, the SharedStateRegistry is generally empty (setting
>>> aside File Merging and Changelog State for now, discussed on 8.), so
>>> keep-alive of files under the shared directory is not a concern.
>>> 
>>>> 8. FLINK-26803 and FLIP-306 compatibility
>>>> This is a very important point. Both features essentially merge small
>>> files at the job level. As Rui Fan pointed out, if the merging
>> granularity
>>> is reduced to the Region level, compatibility with Regional Checkpoint
>>> should be achievable in theory. I think this can be deferred to future
>> work
>>> — once FLINK-26803 is consolidated into FLIP-306, we can revisit and
>> enable
>>> support.
>>> 
>>>> 10. NO_CLAIM mode warning
>>>> You’re absolutely right — this is an important reminder. After restoring
>>> from a Regional Checkpoint, only a successful global checkpoint
>> guarantees
>>> independence from the old state. We’ll add a clear user warning in the
>>> documentation.
>>> 
>>>> 11. Changelog state backend — not supported
>>>> As mentioned earlier, our primary target is embarrassingly parallel
>>> regions, which typically have no keyed state and therefore no slow
>>> incremental state flush issues. I don’t think we need to support
>> Changelog
>>> state backend for now.
>>> 
>>> ----------------------------------------
>>> 
>>>> 2. max-consecutive-failures exceeded — what exactly happens?
>>>> The current design says “force a global checkpoint.” To clarify the
>>> two-tier behavior:
>>>> ● Tier 1: When consecutiveRegionalCount >= maxConsecutiveFailures, the
>>> next checkpoint is forced to be global.
>>>> ● Tier 2: If that forced global checkpoint also fails (any task
>>> declines), the checkpoint is aborted normally (not a job failure). The
>>> counter is then reset since a global checkpoint was attempted, and the
>> next
>>> checkpoint cycle can try again.
>>>> This avoids cascading into job failure while ensuring we don’t drift
>>> indefinitely on historical state.
>>> 
>>> My assumption was that we would not allow this particular failed region
>> to
>>> fail the checkpoint again.
>>> But forcing a global checkpoint works as well.
>>> 
>>>> 6. Checkpoint abort notifications & Local Recovery cleanup — new
>>> notification type
>>>> This is a very insightful point. Zihao and Gen also raised this in
>>> earlier discussions. The current design doesn’t address state cleanup for
>>> tasks in failed regions. I agree it’s necessary to introduce a new
>>> notification type. For tasks in failed regions, local state cleanup can
>> be
>>> deferred until the next checkpoint trigger.
>>> Ok, this can be some future work.
>>> 
>>>> 7. Task that never acknowledges nor declines — per-region timeouts
>>>> This was discussed in the previous thread. Network issues may cause a
>>> task to neither ack nor decline in time. In such cases, we treat it as a
>>> checkpoint timeout: the affected tasks’ region is marked as failed, and
>> the
>>> process ultimately falls through to the normalRegional Checkpoint
>>> processing logic.
>>> Ok, this can be some future work.
>>> 
>>> ----------------------------------------
>>> 
>>> [1]
>>> 
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65145551#FlinkImprovementProposals-CreateyourOwnFLIP
>>> 
>>> [2]
>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
>>> 
>>> Regards,
>>> Roman
>>> 
>>> Regards,
>>> Roman
>>> 
>>> 
>>> On Wed, Jun 17, 2026 at 9:56 AM 熊饶饶 <[email protected]> wrote:
>>> 
>>>> Hi all,
>>>> 
>>>> Thanks everyone for the valuable feedback. I believe all the points
>> raised
>>>> above have been addressed (@Roman @Rui Fan). If there are no further
>>>> concerns  by next Monday (June 22), I'll go ahead and start the [VOTE]
>>>> thread for this FLIP.
>>>> 
>>>> For reference, the earlier related discussion can be found here:
>>>> https://lists.apache.org/thread/qpztk0jdpcmhomszjx63l53xv26xnmwf
>>>> 
>>>> 
>>>> Please feel free to share any additional feedback before then.
>>>> 
>>>> Best Regards,
>>>> Raorao
>>>> 
>>>> 2026年5月27日 16:31,熊饶饶 <[email protected]> 写道:
>>>> 
>>>> Hi devs,
>>>> 
>>>> I would like to start a discussion on FLIP-XXX: Independent Checkpoint
>>>> Based On Pipeline Region.
>>>> 
>>>> In high-parallelism streaming jobs, a single Task's checkpoint failure
>>>> causes the entire global Checkpoint to abort, leading to degraded
>>>> checkpoint success rates and wasted compute resources (especially for
>> GPU
>>>> operators).
>>>> 
>>>> We propose Regional Checkpoint: when some Regions fail to checkpoint,
>> the
>>>> framework combines the historical state of the failed Regions with the
>>>> current state of the healthy Regions to produce a logically complete
>>>> Completed Checkpoint — while preserving state consistency. The key
>> changes
>>>> are:
>>>> 
>>>> 1. Snapshot Collection — Allow partial region failures; combine last
>>>> successful state of failed Regions with current state of normal Regions.
>>>> 
>>>> 2. State Correction — New checkpointCoordinatorForRegionFallback
>> interface
>>>> for OperatorCoordinators to produce consistent snapshots against the
>> mixed
>>>> view.
>>>> 
>>>> 3. Checkpoint Store — Track ref_checkpoint_id in metadata to prevent
>>>> premature cleanup of referenced historical checkpoints.
>>>> 
>>>> The detailed design is described in the FLIP document:
>>>> 
>>>> 
>> https://docs.google.com/document/d/153r9NjHN9xgFUBdZ8sNX6YjUWTREtDMv5i-JaMdE6NU/edit?usp=sharing
>>>> 
>>>> Looking forward to your feedback!
>>>> 
>>>> Best regards,
>>>> 
>>>> Raorao Xiong
>>>> 
>>>> 
>>>> 
>> 
>> 

Reply via email to