Hi Roman, thanks for the follow-up.

1. FLIP page

Thanks for your reminder, I'll create the proper FLIP page before starting 
[VOTE] thread.

2. Rescaling for OperatorSubtaskState

`refCheckpointId` is checkpoint-level metadata stored in the completed 
checkpoint — it doesn't participate in state redistribution during rescaling. 
The `TaskStateAssignment` redistributes only the state handles to new subtasks, 
while the `CompletedCheckpointStore` retains the original metadata (including 
`refCheckpointId`) for the cleaner to trace reference chains. So rescaling is 
inherently compatible.

3. Finished operators

You're right, my previous answer missed the real issue. For bounded source jobs 
(FLIP-147), if the final checkpoint is Regional and a failed Region's subtask 
misses `notifyCheckpointComplete`, their side effects (e.g., Kafka 
transactions) are never committed — that's data loss.

Solution: force a global checkpoint when the job is about to terminate. When 
all sources are exhausted, the JM will mark the next checkpoint as mandatory 
global, requiring all regions to ack. If it fails, the job retries rather than 
terminating with a partial snapshot.

4. Limitations section

I agree with you, and I'll add a section covering all limitations:

BLOCKING/HYBRID edges between Regions -> Auto-disable at runtime 
FLINK-26803 / FLIP-306 -> Warn (future work: region-level merging)
NO_CLAIM restore mode -> Warn (require global checkpoint before snapshot 
deletion)
Changelog state backend ->  Reject job submission
Finished operators (FLIP-147) -> Force global checkpoint when sources are 
exhausted


Thanks again for the review, looking forward to your feedback.

Regards,
Raorao


> 2026年6月19日 22:51,Roman Khachatryan <[email protected]> 写道:
> 
> Hi, thanks for your replies and sorry for the delay.
> 
> Most of my questions were answered, but I still have some concerns.
> 
>> If there are no further concerns by next Monday (June 22), I'll go ahead
> and start the [VOTE] thread for this FLIP.
> 
> Isn't the actual FLIP still missing? I only saw Google Document. Do you
> mind creating a page according to [1]?
> 
> ----------------------------------------
> 
>> 3. Checkpoint metadata layout
>> Regional Checkpoint recombines state from different checkpoint IDs. To
> track this, we add a refCheckpointId field to OperatorSubtaskState in the
> metadata, indicating which historical checkpoint a subtask’s state
> references.
> 
> Could you explain how do we find the right OperatorSubtaskState -
> especially in case of rescaling?
> Does the proposal support rescaling?
> 
>> 9. Finished operators
>> The concern is: a finished operator’s final commit notification gets
> skipped by Regional Checkpoint, and if this checkpoint is the last one, the
> operator never receives it — could this cause data loss?
>> In practice, the impact is limited:
>> ● Failed Region tasks are already gone: By the time the Regional
> Checkpoint completes, tasks in the failed Region have already been
> restarted (decline) or cancelled (timeout). There is no task left to
> receive the notification anyway.
> Checkpoint failure doesn't necessarily cause a restart (especially if this
> is limited to one region). The tasks should still be up and running.
> 
>> ● maxConsecutiveFailures guarantees a global checkpoint: After reaching
> the limit, the next checkpoint is forced to be global, ensuring all tasks
> eventually receive notifyCheckpointComplete. We can’t skip the same Region
> forever.
> maxConsecutiveFailures might not be reached for the final checkpoint.
> 
>> ● stop-with-savepoint bypasses Regional Checkpoint: When the user stops
> the job gracefully, it triggers a full global snapshot, not a Regional
> Checkpoint. So the final checkpoint is always complete.
> stop-with-savepoint should be fine, yes.
> 
> To clarify, my concern is about jobs with bounded sources. In such cases,
> some subtasks might finish processing but still participate in checkpoints.
> After a successful checkpoint, they are guaranteed to get checkpoint
> completion notification - so that they can make side effects visible in
> external systems (commit Kafka transactions).
> See FLIP-147 [2]
> 
> However, with the current proposal, the job might complete with some
> subtasks/regions failing the final checkpoint unless I'm missing something.
> This is essentially data loss.
> To prevent this, the final checkpoint must always be acked by all
> subtasks/regions.
> 
> ----------------------------------------
> 
> There are quite some limitations in this proposal.
> Could you add a section describing how each of them is handled?
> 1. Reject job submission
> 2. Force all-region checkpoint
> 3. Warn in documentation
> 
>> 1. Region independence — BLOCKING/HYBRID edges
>> You’re right. Our current scope is limited to embarrassingly parallel
> regions. In typical ETL scenarios, each parallelism maps to an independent
> Region with no edges connecting them.
> 
>> 5. SharedStateRegistry — how are old states kept alive?
>> Good question. In the current design, since we only target embarrassingly
> parallel regions, there is typically no keyed state and no incremental
> state. As a result, the SharedStateRegistry is generally empty (setting
> aside File Merging and Changelog State for now, discussed on 8.), so
> keep-alive of files under the shared directory is not a concern.
> 
>> 8. FLINK-26803 and FLIP-306 compatibility
>> This is a very important point. Both features essentially merge small
> files at the job level. As Rui Fan pointed out, if the merging granularity
> is reduced to the Region level, compatibility with Regional Checkpoint
> should be achievable in theory. I think this can be deferred to future work
> — once FLINK-26803 is consolidated into FLIP-306, we can revisit and enable
> support.
> 
>> 10. NO_CLAIM mode warning
>> You’re absolutely right — this is an important reminder. After restoring
> from a Regional Checkpoint, only a successful global checkpoint guarantees
> independence from the old state. We’ll add a clear user warning in the
> documentation.
> 
>> 11. Changelog state backend — not supported
>> As mentioned earlier, our primary target is embarrassingly parallel
> regions, which typically have no keyed state and therefore no slow
> incremental state flush issues. I don’t think we need to support Changelog
> state backend for now.
> 
> ----------------------------------------
> 
>> 2. max-consecutive-failures exceeded — what exactly happens?
>> The current design says “force a global checkpoint.” To clarify the
> two-tier behavior:
>> ● Tier 1: When consecutiveRegionalCount >= maxConsecutiveFailures, the
> next checkpoint is forced to be global.
>> ● Tier 2: If that forced global checkpoint also fails (any task
> declines), the checkpoint is aborted normally (not a job failure). The
> counter is then reset since a global checkpoint was attempted, and the next
> checkpoint cycle can try again.
>> This avoids cascading into job failure while ensuring we don’t drift
> indefinitely on historical state.
> 
> My assumption was that we would not allow this particular failed region to
> fail the checkpoint again.
> But forcing a global checkpoint works as well.
> 
>> 6. Checkpoint abort notifications & Local Recovery cleanup — new
> notification type
>> This is a very insightful point. Zihao and Gen also raised this in
> earlier discussions. The current design doesn’t address state cleanup for
> tasks in failed regions. I agree it’s necessary to introduce a new
> notification type. For tasks in failed regions, local state cleanup can be
> deferred until the next checkpoint trigger.
> Ok, this can be some future work.
> 
>> 7. Task that never acknowledges nor declines — per-region timeouts
>> This was discussed in the previous thread. Network issues may cause a
> task to neither ack nor decline in time. In such cases, we treat it as a
> checkpoint timeout: the affected tasks’ region is marked as failed, and the
> process ultimately falls through to the normalRegional Checkpoint
> processing logic.
> Ok, this can be some future work.
> 
> ----------------------------------------
> 
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65145551#FlinkImprovementProposals-CreateyourOwnFLIP
> 
> [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
> 
> Regards,
> Roman
> 
> Regards,
> Roman
> 
> 
> On Wed, Jun 17, 2026 at 9:56 AM 熊饶饶 <[email protected]> wrote:
> 
>> Hi all,
>> 
>> Thanks everyone for the valuable feedback. I believe all the points raised
>> above have been addressed (@Roman @Rui Fan). If there are no further
>> concerns  by next Monday (June 22), I'll go ahead and start the [VOTE]
>> thread for this FLIP.
>> 
>> For reference, the earlier related discussion can be found here:
>> https://lists.apache.org/thread/qpztk0jdpcmhomszjx63l53xv26xnmwf
>> 
>> 
>> Please feel free to share any additional feedback before then.
>> 
>> Best Regards,
>> Raorao
>> 
>> 2026年5月27日 16:31,熊饶饶 <[email protected]> 写道:
>> 
>> Hi devs,
>> 
>> I would like to start a discussion on FLIP-XXX: Independent Checkpoint
>> Based On Pipeline Region.
>> 
>> In high-parallelism streaming jobs, a single Task's checkpoint failure
>> causes the entire global Checkpoint to abort, leading to degraded
>> checkpoint success rates and wasted compute resources (especially for GPU
>> operators).
>> 
>> We propose Regional Checkpoint: when some Regions fail to checkpoint, the
>> framework combines the historical state of the failed Regions with the
>> current state of the healthy Regions to produce a logically complete
>> Completed Checkpoint — while preserving state consistency. The key changes
>> are:
>> 
>> 1. Snapshot Collection — Allow partial region failures; combine last
>> successful state of failed Regions with current state of normal Regions.
>> 
>> 2. State Correction — New checkpointCoordinatorForRegionFallback interface
>> for OperatorCoordinators to produce consistent snapshots against the mixed
>> view.
>> 
>> 3. Checkpoint Store — Track ref_checkpoint_id in metadata to prevent
>> premature cleanup of referenced historical checkpoints.
>> 
>> The detailed design is described in the FLIP document:
>> 
>> https://docs.google.com/document/d/153r9NjHN9xgFUBdZ8sNX6YjUWTREtDMv5i-JaMdE6NU/edit?usp=sharing
>> 
>> Looking forward to your feedback!
>> 
>> Best regards,
>> 
>> Raorao Xiong
>> 
>> 
>> 

Reply via email to