Hi Raorao, Good to see this proposal and +1 for the direction. Sorry for joining the discussion late. I have seen many constructive suggestions that largely align with my thoughts, but I still have one concern:
I'm one of the authors of FLIP-306 and I'm not in favor of region-level merging. IIUC, region-level merge files severely limit the effectiveness of merging, as merging cannot happen between subtasks. I think it should still be possible to perform TM-level merge. The only thing we should do is to keep previous checkpoint files alive when a region checkpoint occurs. This does not conflict with the current design. It is only necessary to ensure that the new behavior of checkpoint notifications is compatible with FLIP-306, or some minor adjustment needed towards the merging manager. And BTW to @Rui, we still need more work to let FLIP-306 cover channel state and deprecate FLINK-26803, and I will look into this soon. Best, Zakelly On Tue, Jun 23, 2026 at 2:40 PM 熊饶饶 <[email protected]> wrote: > Hi Roman, thanks for the follow-up. > > 1. FLIP page > > Thanks for your reminder, I'll create the proper FLIP page before starting > [VOTE] thread. > > 2. Rescaling for OperatorSubtaskState > > `refCheckpointId` is checkpoint-level metadata stored in the completed > checkpoint — it doesn't participate in state redistribution during > rescaling. The `TaskStateAssignment` redistributes only the state handles > to new subtasks, while the `CompletedCheckpointStore` retains the original > metadata (including `refCheckpointId`) for the cleaner to trace reference > chains. So rescaling is inherently compatible. > > 3. Finished operators > > You're right, my previous answer missed the real issue. For bounded source > jobs (FLIP-147), if the final checkpoint is Regional and a failed Region's > subtask misses `notifyCheckpointComplete`, their side effects (e.g., Kafka > transactions) are never committed — that's data loss. > > Solution: force a global checkpoint when the job is about to terminate. > When all sources are exhausted, the JM will mark the next checkpoint as > mandatory global, requiring all regions to ack. If it fails, the job > retries rather than terminating with a partial snapshot. > > 4. Limitations section > > I agree with you, and I'll add a section covering all limitations: > > BLOCKING/HYBRID edges between Regions -> Auto-disable at runtime > FLINK-26803 / FLIP-306 -> Warn (future work: region-level merging) > NO_CLAIM restore mode -> Warn (require global checkpoint before snapshot > deletion) > Changelog state backend -> Reject job submission > Finished operators (FLIP-147) -> Force global checkpoint when sources are > exhausted > > > Thanks again for the review, looking forward to your feedback. > > Regards, > Raorao > > > > 2026年6月19日 22:51,Roman Khachatryan <[email protected]> 写道: > > > > Hi, thanks for your replies and sorry for the delay. > > > > Most of my questions were answered, but I still have some concerns. > > > >> If there are no further concerns by next Monday (June 22), I'll go ahead > > and start the [VOTE] thread for this FLIP. > > > > Isn't the actual FLIP still missing? I only saw Google Document. Do you > > mind creating a page according to [1]? > > > > ---------------------------------------- > > > >> 3. Checkpoint metadata layout > >> Regional Checkpoint recombines state from different checkpoint IDs. To > > track this, we add a refCheckpointId field to OperatorSubtaskState in the > > metadata, indicating which historical checkpoint a subtask’s state > > references. > > > > Could you explain how do we find the right OperatorSubtaskState - > > especially in case of rescaling? > > Does the proposal support rescaling? > > > >> 9. Finished operators > >> The concern is: a finished operator’s final commit notification gets > > skipped by Regional Checkpoint, and if this checkpoint is the last one, > the > > operator never receives it — could this cause data loss? > >> In practice, the impact is limited: > >> ● Failed Region tasks are already gone: By the time the Regional > > Checkpoint completes, tasks in the failed Region have already been > > restarted (decline) or cancelled (timeout). There is no task left to > > receive the notification anyway. > > Checkpoint failure doesn't necessarily cause a restart (especially if > this > > is limited to one region). The tasks should still be up and running. > > > >> ● maxConsecutiveFailures guarantees a global checkpoint: After reaching > > the limit, the next checkpoint is forced to be global, ensuring all tasks > > eventually receive notifyCheckpointComplete. We can’t skip the same > Region > > forever. > > maxConsecutiveFailures might not be reached for the final checkpoint. > > > >> ● stop-with-savepoint bypasses Regional Checkpoint: When the user stops > > the job gracefully, it triggers a full global snapshot, not a Regional > > Checkpoint. So the final checkpoint is always complete. > > stop-with-savepoint should be fine, yes. > > > > To clarify, my concern is about jobs with bounded sources. In such cases, > > some subtasks might finish processing but still participate in > checkpoints. > > After a successful checkpoint, they are guaranteed to get checkpoint > > completion notification - so that they can make side effects visible in > > external systems (commit Kafka transactions). > > See FLIP-147 [2] > > > > However, with the current proposal, the job might complete with some > > subtasks/regions failing the final checkpoint unless I'm missing > something. > > This is essentially data loss. > > To prevent this, the final checkpoint must always be acked by all > > subtasks/regions. > > > > ---------------------------------------- > > > > There are quite some limitations in this proposal. > > Could you add a section describing how each of them is handled? > > 1. Reject job submission > > 2. Force all-region checkpoint > > 3. Warn in documentation > > > >> 1. Region independence — BLOCKING/HYBRID edges > >> You’re right. Our current scope is limited to embarrassingly parallel > > regions. In typical ETL scenarios, each parallelism maps to an > independent > > Region with no edges connecting them. > > > >> 5. SharedStateRegistry — how are old states kept alive? > >> Good question. In the current design, since we only target > embarrassingly > > parallel regions, there is typically no keyed state and no incremental > > state. As a result, the SharedStateRegistry is generally empty (setting > > aside File Merging and Changelog State for now, discussed on 8.), so > > keep-alive of files under the shared directory is not a concern. > > > >> 8. FLINK-26803 and FLIP-306 compatibility > >> This is a very important point. Both features essentially merge small > > files at the job level. As Rui Fan pointed out, if the merging > granularity > > is reduced to the Region level, compatibility with Regional Checkpoint > > should be achievable in theory. I think this can be deferred to future > work > > — once FLINK-26803 is consolidated into FLIP-306, we can revisit and > enable > > support. > > > >> 10. NO_CLAIM mode warning > >> You’re absolutely right — this is an important reminder. After restoring > > from a Regional Checkpoint, only a successful global checkpoint > guarantees > > independence from the old state. We’ll add a clear user warning in the > > documentation. > > > >> 11. Changelog state backend — not supported > >> As mentioned earlier, our primary target is embarrassingly parallel > > regions, which typically have no keyed state and therefore no slow > > incremental state flush issues. I don’t think we need to support > Changelog > > state backend for now. > > > > ---------------------------------------- > > > >> 2. max-consecutive-failures exceeded — what exactly happens? > >> The current design says “force a global checkpoint.” To clarify the > > two-tier behavior: > >> ● Tier 1: When consecutiveRegionalCount >= maxConsecutiveFailures, the > > next checkpoint is forced to be global. > >> ● Tier 2: If that forced global checkpoint also fails (any task > > declines), the checkpoint is aborted normally (not a job failure). The > > counter is then reset since a global checkpoint was attempted, and the > next > > checkpoint cycle can try again. > >> This avoids cascading into job failure while ensuring we don’t drift > > indefinitely on historical state. > > > > My assumption was that we would not allow this particular failed region > to > > fail the checkpoint again. > > But forcing a global checkpoint works as well. > > > >> 6. Checkpoint abort notifications & Local Recovery cleanup — new > > notification type > >> This is a very insightful point. Zihao and Gen also raised this in > > earlier discussions. The current design doesn’t address state cleanup for > > tasks in failed regions. I agree it’s necessary to introduce a new > > notification type. For tasks in failed regions, local state cleanup can > be > > deferred until the next checkpoint trigger. > > Ok, this can be some future work. > > > >> 7. Task that never acknowledges nor declines — per-region timeouts > >> This was discussed in the previous thread. Network issues may cause a > > task to neither ack nor decline in time. In such cases, we treat it as a > > checkpoint timeout: the affected tasks’ region is marked as failed, and > the > > process ultimately falls through to the normalRegional Checkpoint > > processing logic. > > Ok, this can be some future work. > > > > ---------------------------------------- > > > > [1] > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65145551#FlinkImprovementProposals-CreateyourOwnFLIP > > > > [2] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished > > > > Regards, > > Roman > > > > Regards, > > Roman > > > > > > On Wed, Jun 17, 2026 at 9:56 AM 熊饶饶 <[email protected]> wrote: > > > >> Hi all, > >> > >> Thanks everyone for the valuable feedback. I believe all the points > raised > >> above have been addressed (@Roman @Rui Fan). If there are no further > >> concerns by next Monday (June 22), I'll go ahead and start the [VOTE] > >> thread for this FLIP. > >> > >> For reference, the earlier related discussion can be found here: > >> https://lists.apache.org/thread/qpztk0jdpcmhomszjx63l53xv26xnmwf > >> > >> > >> Please feel free to share any additional feedback before then. > >> > >> Best Regards, > >> Raorao > >> > >> 2026年5月27日 16:31,熊饶饶 <[email protected]> 写道: > >> > >> Hi devs, > >> > >> I would like to start a discussion on FLIP-XXX: Independent Checkpoint > >> Based On Pipeline Region. > >> > >> In high-parallelism streaming jobs, a single Task's checkpoint failure > >> causes the entire global Checkpoint to abort, leading to degraded > >> checkpoint success rates and wasted compute resources (especially for > GPU > >> operators). > >> > >> We propose Regional Checkpoint: when some Regions fail to checkpoint, > the > >> framework combines the historical state of the failed Regions with the > >> current state of the healthy Regions to produce a logically complete > >> Completed Checkpoint — while preserving state consistency. The key > changes > >> are: > >> > >> 1. Snapshot Collection — Allow partial region failures; combine last > >> successful state of failed Regions with current state of normal Regions. > >> > >> 2. State Correction — New checkpointCoordinatorForRegionFallback > interface > >> for OperatorCoordinators to produce consistent snapshots against the > mixed > >> view. > >> > >> 3. Checkpoint Store — Track ref_checkpoint_id in metadata to prevent > >> premature cleanup of referenced historical checkpoints. > >> > >> The detailed design is described in the FLIP document: > >> > >> > https://docs.google.com/document/d/153r9NjHN9xgFUBdZ8sNX6YjUWTREtDMv5i-JaMdE6NU/edit?usp=sharing > >> > >> Looking forward to your feedback! > >> > >> Best regards, > >> > >> Raorao Xiong > >> > >> > >> > >
