Hi Zakelly, Thanks for the feedback! I completely agree — forcing region-level merging would significantly reduce the effectiveness of file merging, and that's not the right trade-off.
I think TM-level merging can work correctly with Regional Checkpoint if we ensure the FileMergingSnapshotManager properly handles the new checkpoint notification semantics. Specifically: - The FileMergingSnapshotManager already uses a notification-based lifecycle (checkpoint complete/abort/subsumed) to manage physical file deletion. - For Regional Checkpoint, we need to introduce the new notification type we discussed earlier (notify partially-completed) so that the merging manager knows: this checkpoint is complete but some segments belong to failed regions — keep the physical files that contain those segments alive until the referencing checkpoints are subsumed. - This aligns with the existing design and requires only minor adjustments to the merging manager's notification handling. The previous suggestion of region-level merging was overly conservative — glad you pointed out the better approach. Also good to know about the plan to cover channel state in FLIP-306. That will simplify the compatibility matrix significantly. Best Regards, Raorao > 2026年6月23日 23:33,Zakelly Lan <[email protected]> 写道: > > Hi Raorao, > > Good to see this proposal and +1 for the direction. Sorry for joining the > discussion late. I have seen many constructive suggestions that largely > align with my thoughts, but I still have one concern: > > I'm one of the authors of FLIP-306 and I'm not in favor of region-level > merging. IIUC, region-level merge files severely limit the effectiveness of > merging, as merging cannot happen between subtasks. I think it should still > be possible to perform TM-level merge. The only thing we should do is to > keep previous checkpoint files alive when a region checkpoint occurs. This > does not conflict with the current design. It is only necessary to ensure > that the new behavior of checkpoint notifications is compatible with > FLIP-306, or some minor adjustment needed towards the merging manager. > > And BTW to @Rui, we still need more work to let FLIP-306 cover channel > state and deprecate FLINK-26803, and I will look into this soon. > > > Best, > Zakelly > > On Tue, Jun 23, 2026 at 2:40 PM 熊饶饶 <[email protected]> wrote: > >> Hi Roman, thanks for the follow-up. >> >> 1. FLIP page >> >> Thanks for your reminder, I'll create the proper FLIP page before starting >> [VOTE] thread. >> >> 2. Rescaling for OperatorSubtaskState >> >> `refCheckpointId` is checkpoint-level metadata stored in the completed >> checkpoint — it doesn't participate in state redistribution during >> rescaling. The `TaskStateAssignment` redistributes only the state handles >> to new subtasks, while the `CompletedCheckpointStore` retains the original >> metadata (including `refCheckpointId`) for the cleaner to trace reference >> chains. So rescaling is inherently compatible. >> >> 3. Finished operators >> >> You're right, my previous answer missed the real issue. For bounded source >> jobs (FLIP-147), if the final checkpoint is Regional and a failed Region's >> subtask misses `notifyCheckpointComplete`, their side effects (e.g., Kafka >> transactions) are never committed — that's data loss. >> >> Solution: force a global checkpoint when the job is about to terminate. >> When all sources are exhausted, the JM will mark the next checkpoint as >> mandatory global, requiring all regions to ack. If it fails, the job >> retries rather than terminating with a partial snapshot. >> >> 4. Limitations section >> >> I agree with you, and I'll add a section covering all limitations: >> >> BLOCKING/HYBRID edges between Regions -> Auto-disable at runtime >> FLINK-26803 / FLIP-306 -> Warn (future work: region-level merging) >> NO_CLAIM restore mode -> Warn (require global checkpoint before snapshot >> deletion) >> Changelog state backend -> Reject job submission >> Finished operators (FLIP-147) -> Force global checkpoint when sources are >> exhausted >> >> >> Thanks again for the review, looking forward to your feedback. >> >> Regards, >> Raorao >> >> >>> 2026年6月19日 22:51,Roman Khachatryan <[email protected]> 写道: >>> >>> Hi, thanks for your replies and sorry for the delay. >>> >>> Most of my questions were answered, but I still have some concerns. >>> >>>> If there are no further concerns by next Monday (June 22), I'll go ahead >>> and start the [VOTE] thread for this FLIP. >>> >>> Isn't the actual FLIP still missing? I only saw Google Document. Do you >>> mind creating a page according to [1]? >>> >>> ---------------------------------------- >>> >>>> 3. Checkpoint metadata layout >>>> Regional Checkpoint recombines state from different checkpoint IDs. To >>> track this, we add a refCheckpointId field to OperatorSubtaskState in the >>> metadata, indicating which historical checkpoint a subtask’s state >>> references. >>> >>> Could you explain how do we find the right OperatorSubtaskState - >>> especially in case of rescaling? >>> Does the proposal support rescaling? >>> >>>> 9. Finished operators >>>> The concern is: a finished operator’s final commit notification gets >>> skipped by Regional Checkpoint, and if this checkpoint is the last one, >> the >>> operator never receives it — could this cause data loss? >>>> In practice, the impact is limited: >>>> ● Failed Region tasks are already gone: By the time the Regional >>> Checkpoint completes, tasks in the failed Region have already been >>> restarted (decline) or cancelled (timeout). There is no task left to >>> receive the notification anyway. >>> Checkpoint failure doesn't necessarily cause a restart (especially if >> this >>> is limited to one region). The tasks should still be up and running. >>> >>>> ● maxConsecutiveFailures guarantees a global checkpoint: After reaching >>> the limit, the next checkpoint is forced to be global, ensuring all tasks >>> eventually receive notifyCheckpointComplete. We can’t skip the same >> Region >>> forever. >>> maxConsecutiveFailures might not be reached for the final checkpoint. >>> >>>> ● stop-with-savepoint bypasses Regional Checkpoint: When the user stops >>> the job gracefully, it triggers a full global snapshot, not a Regional >>> Checkpoint. So the final checkpoint is always complete. >>> stop-with-savepoint should be fine, yes. >>> >>> To clarify, my concern is about jobs with bounded sources. In such cases, >>> some subtasks might finish processing but still participate in >> checkpoints. >>> After a successful checkpoint, they are guaranteed to get checkpoint >>> completion notification - so that they can make side effects visible in >>> external systems (commit Kafka transactions). >>> See FLIP-147 [2] >>> >>> However, with the current proposal, the job might complete with some >>> subtasks/regions failing the final checkpoint unless I'm missing >> something. >>> This is essentially data loss. >>> To prevent this, the final checkpoint must always be acked by all >>> subtasks/regions. >>> >>> ---------------------------------------- >>> >>> There are quite some limitations in this proposal. >>> Could you add a section describing how each of them is handled? >>> 1. Reject job submission >>> 2. Force all-region checkpoint >>> 3. Warn in documentation >>> >>>> 1. Region independence — BLOCKING/HYBRID edges >>>> You’re right. Our current scope is limited to embarrassingly parallel >>> regions. In typical ETL scenarios, each parallelism maps to an >> independent >>> Region with no edges connecting them. >>> >>>> 5. SharedStateRegistry — how are old states kept alive? >>>> Good question. In the current design, since we only target >> embarrassingly >>> parallel regions, there is typically no keyed state and no incremental >>> state. As a result, the SharedStateRegistry is generally empty (setting >>> aside File Merging and Changelog State for now, discussed on 8.), so >>> keep-alive of files under the shared directory is not a concern. >>> >>>> 8. FLINK-26803 and FLIP-306 compatibility >>>> This is a very important point. Both features essentially merge small >>> files at the job level. As Rui Fan pointed out, if the merging >> granularity >>> is reduced to the Region level, compatibility with Regional Checkpoint >>> should be achievable in theory. I think this can be deferred to future >> work >>> — once FLINK-26803 is consolidated into FLIP-306, we can revisit and >> enable >>> support. >>> >>>> 10. NO_CLAIM mode warning >>>> You’re absolutely right — this is an important reminder. After restoring >>> from a Regional Checkpoint, only a successful global checkpoint >> guarantees >>> independence from the old state. We’ll add a clear user warning in the >>> documentation. >>> >>>> 11. Changelog state backend — not supported >>>> As mentioned earlier, our primary target is embarrassingly parallel >>> regions, which typically have no keyed state and therefore no slow >>> incremental state flush issues. I don’t think we need to support >> Changelog >>> state backend for now. >>> >>> ---------------------------------------- >>> >>>> 2. max-consecutive-failures exceeded — what exactly happens? >>>> The current design says “force a global checkpoint.” To clarify the >>> two-tier behavior: >>>> ● Tier 1: When consecutiveRegionalCount >= maxConsecutiveFailures, the >>> next checkpoint is forced to be global. >>>> ● Tier 2: If that forced global checkpoint also fails (any task >>> declines), the checkpoint is aborted normally (not a job failure). The >>> counter is then reset since a global checkpoint was attempted, and the >> next >>> checkpoint cycle can try again. >>>> This avoids cascading into job failure while ensuring we don’t drift >>> indefinitely on historical state. >>> >>> My assumption was that we would not allow this particular failed region >> to >>> fail the checkpoint again. >>> But forcing a global checkpoint works as well. >>> >>>> 6. Checkpoint abort notifications & Local Recovery cleanup — new >>> notification type >>>> This is a very insightful point. Zihao and Gen also raised this in >>> earlier discussions. The current design doesn’t address state cleanup for >>> tasks in failed regions. I agree it’s necessary to introduce a new >>> notification type. For tasks in failed regions, local state cleanup can >> be >>> deferred until the next checkpoint trigger. >>> Ok, this can be some future work. >>> >>>> 7. Task that never acknowledges nor declines — per-region timeouts >>>> This was discussed in the previous thread. Network issues may cause a >>> task to neither ack nor decline in time. In such cases, we treat it as a >>> checkpoint timeout: the affected tasks’ region is marked as failed, and >> the >>> process ultimately falls through to the normalRegional Checkpoint >>> processing logic. >>> Ok, this can be some future work. >>> >>> ---------------------------------------- >>> >>> [1] >>> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65145551#FlinkImprovementProposals-CreateyourOwnFLIP >>> >>> [2] >>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished >>> >>> Regards, >>> Roman >>> >>> Regards, >>> Roman >>> >>> >>> On Wed, Jun 17, 2026 at 9:56 AM 熊饶饶 <[email protected]> wrote: >>> >>>> Hi all, >>>> >>>> Thanks everyone for the valuable feedback. I believe all the points >> raised >>>> above have been addressed (@Roman @Rui Fan). If there are no further >>>> concerns by next Monday (June 22), I'll go ahead and start the [VOTE] >>>> thread for this FLIP. >>>> >>>> For reference, the earlier related discussion can be found here: >>>> https://lists.apache.org/thread/qpztk0jdpcmhomszjx63l53xv26xnmwf >>>> >>>> >>>> Please feel free to share any additional feedback before then. >>>> >>>> Best Regards, >>>> Raorao >>>> >>>> 2026年5月27日 16:31,熊饶饶 <[email protected]> 写道: >>>> >>>> Hi devs, >>>> >>>> I would like to start a discussion on FLIP-XXX: Independent Checkpoint >>>> Based On Pipeline Region. >>>> >>>> In high-parallelism streaming jobs, a single Task's checkpoint failure >>>> causes the entire global Checkpoint to abort, leading to degraded >>>> checkpoint success rates and wasted compute resources (especially for >> GPU >>>> operators). >>>> >>>> We propose Regional Checkpoint: when some Regions fail to checkpoint, >> the >>>> framework combines the historical state of the failed Regions with the >>>> current state of the healthy Regions to produce a logically complete >>>> Completed Checkpoint — while preserving state consistency. The key >> changes >>>> are: >>>> >>>> 1. Snapshot Collection — Allow partial region failures; combine last >>>> successful state of failed Regions with current state of normal Regions. >>>> >>>> 2. State Correction — New checkpointCoordinatorForRegionFallback >> interface >>>> for OperatorCoordinators to produce consistent snapshots against the >> mixed >>>> view. >>>> >>>> 3. Checkpoint Store — Track ref_checkpoint_id in metadata to prevent >>>> premature cleanup of referenced historical checkpoints. >>>> >>>> The detailed design is described in the FLIP document: >>>> >>>> >> https://docs.google.com/document/d/153r9NjHN9xgFUBdZ8sNX6YjUWTREtDMv5i-JaMdE6NU/edit?usp=sharing >>>> >>>> Looking forward to your feedback! >>>> >>>> Best regards, >>>> >>>> Raorao Xiong >>>> >>>> >>>> >> >>
