Hi Raorao, thanks for your answers > > 2. Rescaling for OperatorSubtaskState
> `refCheckpointId` is checkpoint-level metadata stored in the completed checkpoint — > it doesn't participate in state redistribution during rescaling. The `TaskStateAssignment` > redistributes only the state handles to new subtasks, while the `CompletedCheckpointStore` > retains the original metadata (including `refCheckpointId`) for the cleaner to trace > reference chains. So rescaling is inherently compatible. Do you mean that we'll run StateAssignmentOperation per every referenced checkpoint on every "partial" checkpoint completion? (it can be quite heavy) I don't think that "redistributes only the state handles to new subtasks" part is true - on rescaling, distribution changes for (potentially) every sub-task. >From the docs [1]: - Even-split redistribution: Each operator returns a List of state elements. The whole state is logically a concatenation of all lists. On restore/redistribution, the list is evenly divided into as many sublists as there are parallel operators. Each operator gets a sublist, which can be empty, or contain one or more elements. - Union redistribution: Each operator returns a List of state elements. The whole state is logically a concatenation of all lists. On restore/redistribution, each operator gets the complete list of state elements. ... - (we said earlier that keyed stated is not supported) So I don't see how any of the above can work because we might mix old and new states in a sub-task. For example: - a job is running with parallelism 2 and has even-split state distribution - 1st checkpoint is completes by 2/2 sub-tasks - 2nd checkpoint is completes by 1/2 sub-tasks: sub-task 2 failed the checkpoint; its state in checkpoint 2 refers to "subtask 2 state in checkpoint 1" - job is rescaled to 1 - sub-task 1 now has state from checkpoint 1 AND 2? Furthermore, with multiple rounds of downscaling, there can be a single sub-task referring to multiple historical checkpoints. For Union, it's even more problematic. Also, what about channel state (Unaligned checkpoint) re-distribution? Probably it's better to have a FLIP document with the corresponding section first and then discuss it. [1] https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/state/#operator-state Regards, Roman On Wed, Jun 24, 2026 at 5:21 AM 熊饶饶 <[email protected]> wrote: > Hi Zakelly, > > Thanks for the feedback! I completely agree — forcing region-level merging > would significantly reduce the effectiveness of file merging, and that's > not the right trade-off. > > I think TM-level merging can work correctly with Regional Checkpoint if we > ensure the FileMergingSnapshotManager properly handles the new checkpoint > notification semantics. Specifically: > > - The FileMergingSnapshotManager already uses a notification-based > lifecycle (checkpoint complete/abort/subsumed) to manage physical file > deletion. > - For Regional Checkpoint, we need to introduce the new notification type > we discussed earlier (notify partially-completed) so that the merging > manager knows: this checkpoint is complete but some segments belong to > failed regions — keep the physical files that contain those segments alive > until the referencing checkpoints are subsumed. > - This aligns with the existing design and requires only minor adjustments > to the merging manager's notification handling. > > The previous suggestion of region-level merging was overly conservative — > glad you pointed out the better approach. > > Also good to know about the plan to cover channel state in FLIP-306. That > will simplify the compatibility matrix significantly. > > Best Regards, > Raorao > > > 2026年6月23日 23:33,Zakelly Lan <[email protected]> 写道: > > > > Hi Raorao, > > > > Good to see this proposal and +1 for the direction. Sorry for joining the > > discussion late. I have seen many constructive suggestions that largely > > align with my thoughts, but I still have one concern: > > > > I'm one of the authors of FLIP-306 and I'm not in favor of region-level > > merging. IIUC, region-level merge files severely limit the effectiveness > of > > merging, as merging cannot happen between subtasks. I think it should > still > > be possible to perform TM-level merge. The only thing we should do is to > > keep previous checkpoint files alive when a region checkpoint occurs. > This > > does not conflict with the current design. It is only necessary to ensure > > that the new behavior of checkpoint notifications is compatible with > > FLIP-306, or some minor adjustment needed towards the merging manager. > > > > And BTW to @Rui, we still need more work to let FLIP-306 cover channel > > state and deprecate FLINK-26803, and I will look into this soon. > > > > > > Best, > > Zakelly > > > > On Tue, Jun 23, 2026 at 2:40 PM 熊饶饶 <[email protected]> wrote: > > > >> Hi Roman, thanks for the follow-up. > >> > >> 1. FLIP page > >> > >> Thanks for your reminder, I'll create the proper FLIP page before > starting > >> [VOTE] thread. > >> > >> 2. Rescaling for OperatorSubtaskState > >> > >> `refCheckpointId` is checkpoint-level metadata stored in the completed > >> checkpoint — it doesn't participate in state redistribution during > >> rescaling. The `TaskStateAssignment` redistributes only the state > handles > >> to new subtasks, while the `CompletedCheckpointStore` retains the > original > >> metadata (including `refCheckpointId`) for the cleaner to trace > reference > >> chains. So rescaling is inherently compatible. > >> > >> 3. Finished operators > >> > >> You're right, my previous answer missed the real issue. For bounded > source > >> jobs (FLIP-147), if the final checkpoint is Regional and a failed > Region's > >> subtask misses `notifyCheckpointComplete`, their side effects (e.g., > Kafka > >> transactions) are never committed — that's data loss. > >> > >> Solution: force a global checkpoint when the job is about to terminate. > >> When all sources are exhausted, the JM will mark the next checkpoint as > >> mandatory global, requiring all regions to ack. If it fails, the job > >> retries rather than terminating with a partial snapshot. > >> > >> 4. Limitations section > >> > >> I agree with you, and I'll add a section covering all limitations: > >> > >> BLOCKING/HYBRID edges between Regions -> Auto-disable at runtime > >> FLINK-26803 / FLIP-306 -> Warn (future work: region-level merging) > >> NO_CLAIM restore mode -> Warn (require global checkpoint before snapshot > >> deletion) > >> Changelog state backend -> Reject job submission > >> Finished operators (FLIP-147) -> Force global checkpoint when sources > are > >> exhausted > >> > >> > >> Thanks again for the review, looking forward to your feedback. > >> > >> Regards, > >> Raorao > >> > >> > >>> 2026年6月19日 22:51,Roman Khachatryan <[email protected]> 写道: > >>> > >>> Hi, thanks for your replies and sorry for the delay. > >>> > >>> Most of my questions were answered, but I still have some concerns. > >>> > >>>> If there are no further concerns by next Monday (June 22), I'll go > ahead > >>> and start the [VOTE] thread for this FLIP. > >>> > >>> Isn't the actual FLIP still missing? I only saw Google Document. Do you > >>> mind creating a page according to [1]? > >>> > >>> ---------------------------------------- > >>> > >>>> 3. Checkpoint metadata layout > >>>> Regional Checkpoint recombines state from different checkpoint IDs. To > >>> track this, we add a refCheckpointId field to OperatorSubtaskState in > the > >>> metadata, indicating which historical checkpoint a subtask’s state > >>> references. > >>> > >>> Could you explain how do we find the right OperatorSubtaskState - > >>> especially in case of rescaling? > >>> Does the proposal support rescaling? > >>> > >>>> 9. Finished operators > >>>> The concern is: a finished operator’s final commit notification gets > >>> skipped by Regional Checkpoint, and if this checkpoint is the last one, > >> the > >>> operator never receives it — could this cause data loss? > >>>> In practice, the impact is limited: > >>>> ● Failed Region tasks are already gone: By the time the Regional > >>> Checkpoint completes, tasks in the failed Region have already been > >>> restarted (decline) or cancelled (timeout). There is no task left to > >>> receive the notification anyway. > >>> Checkpoint failure doesn't necessarily cause a restart (especially if > >> this > >>> is limited to one region). The tasks should still be up and running. > >>> > >>>> ● maxConsecutiveFailures guarantees a global checkpoint: After > reaching > >>> the limit, the next checkpoint is forced to be global, ensuring all > tasks > >>> eventually receive notifyCheckpointComplete. We can’t skip the same > >> Region > >>> forever. > >>> maxConsecutiveFailures might not be reached for the final checkpoint. > >>> > >>>> ● stop-with-savepoint bypasses Regional Checkpoint: When the user > stops > >>> the job gracefully, it triggers a full global snapshot, not a Regional > >>> Checkpoint. So the final checkpoint is always complete. > >>> stop-with-savepoint should be fine, yes. > >>> > >>> To clarify, my concern is about jobs with bounded sources. In such > cases, > >>> some subtasks might finish processing but still participate in > >> checkpoints. > >>> After a successful checkpoint, they are guaranteed to get checkpoint > >>> completion notification - so that they can make side effects visible in > >>> external systems (commit Kafka transactions). > >>> See FLIP-147 [2] > >>> > >>> However, with the current proposal, the job might complete with some > >>> subtasks/regions failing the final checkpoint unless I'm missing > >> something. > >>> This is essentially data loss. > >>> To prevent this, the final checkpoint must always be acked by all > >>> subtasks/regions. > >>> > >>> ---------------------------------------- > >>> > >>> There are quite some limitations in this proposal. > >>> Could you add a section describing how each of them is handled? > >>> 1. Reject job submission > >>> 2. Force all-region checkpoint > >>> 3. Warn in documentation > >>> > >>>> 1. Region independence — BLOCKING/HYBRID edges > >>>> You’re right. Our current scope is limited to embarrassingly parallel > >>> regions. In typical ETL scenarios, each parallelism maps to an > >> independent > >>> Region with no edges connecting them. > >>> > >>>> 5. SharedStateRegistry — how are old states kept alive? > >>>> Good question. In the current design, since we only target > >> embarrassingly > >>> parallel regions, there is typically no keyed state and no incremental > >>> state. As a result, the SharedStateRegistry is generally empty (setting > >>> aside File Merging and Changelog State for now, discussed on 8.), so > >>> keep-alive of files under the shared directory is not a concern. > >>> > >>>> 8. FLINK-26803 and FLIP-306 compatibility > >>>> This is a very important point. Both features essentially merge small > >>> files at the job level. As Rui Fan pointed out, if the merging > >> granularity > >>> is reduced to the Region level, compatibility with Regional Checkpoint > >>> should be achievable in theory. I think this can be deferred to future > >> work > >>> — once FLINK-26803 is consolidated into FLIP-306, we can revisit and > >> enable > >>> support. > >>> > >>>> 10. NO_CLAIM mode warning > >>>> You’re absolutely right — this is an important reminder. After > restoring > >>> from a Regional Checkpoint, only a successful global checkpoint > >> guarantees > >>> independence from the old state. We’ll add a clear user warning in the > >>> documentation. > >>> > >>>> 11. Changelog state backend — not supported > >>>> As mentioned earlier, our primary target is embarrassingly parallel > >>> regions, which typically have no keyed state and therefore no slow > >>> incremental state flush issues. I don’t think we need to support > >> Changelog > >>> state backend for now. > >>> > >>> ---------------------------------------- > >>> > >>>> 2. max-consecutive-failures exceeded — what exactly happens? > >>>> The current design says “force a global checkpoint.” To clarify the > >>> two-tier behavior: > >>>> ● Tier 1: When consecutiveRegionalCount >= maxConsecutiveFailures, the > >>> next checkpoint is forced to be global. > >>>> ● Tier 2: If that forced global checkpoint also fails (any task > >>> declines), the checkpoint is aborted normally (not a job failure). The > >>> counter is then reset since a global checkpoint was attempted, and the > >> next > >>> checkpoint cycle can try again. > >>>> This avoids cascading into job failure while ensuring we don’t drift > >>> indefinitely on historical state. > >>> > >>> My assumption was that we would not allow this particular failed region > >> to > >>> fail the checkpoint again. > >>> But forcing a global checkpoint works as well. > >>> > >>>> 6. Checkpoint abort notifications & Local Recovery cleanup — new > >>> notification type > >>>> This is a very insightful point. Zihao and Gen also raised this in > >>> earlier discussions. The current design doesn’t address state cleanup > for > >>> tasks in failed regions. I agree it’s necessary to introduce a new > >>> notification type. For tasks in failed regions, local state cleanup can > >> be > >>> deferred until the next checkpoint trigger. > >>> Ok, this can be some future work. > >>> > >>>> 7. Task that never acknowledges nor declines — per-region timeouts > >>>> This was discussed in the previous thread. Network issues may cause a > >>> task to neither ack nor decline in time. In such cases, we treat it as > a > >>> checkpoint timeout: the affected tasks’ region is marked as failed, and > >> the > >>> process ultimately falls through to the normalRegional Checkpoint > >>> processing logic. > >>> Ok, this can be some future work. > >>> > >>> ---------------------------------------- > >>> > >>> [1] > >>> > >> > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65145551#FlinkImprovementProposals-CreateyourOwnFLIP > >>> > >>> [2] > >>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished > >>> > >>> Regards, > >>> Roman > >>> > >>> Regards, > >>> Roman > >>> > >>> > >>> On Wed, Jun 17, 2026 at 9:56 AM 熊饶饶 <[email protected]> wrote: > >>> > >>>> Hi all, > >>>> > >>>> Thanks everyone for the valuable feedback. I believe all the points > >> raised > >>>> above have been addressed (@Roman @Rui Fan). If there are no further > >>>> concerns by next Monday (June 22), I'll go ahead and start the [VOTE] > >>>> thread for this FLIP. > >>>> > >>>> For reference, the earlier related discussion can be found here: > >>>> https://lists.apache.org/thread/qpztk0jdpcmhomszjx63l53xv26xnmwf > >>>> > >>>> > >>>> Please feel free to share any additional feedback before then. > >>>> > >>>> Best Regards, > >>>> Raorao > >>>> > >>>> 2026年5月27日 16:31,熊饶饶 <[email protected]> 写道: > >>>> > >>>> Hi devs, > >>>> > >>>> I would like to start a discussion on FLIP-XXX: Independent Checkpoint > >>>> Based On Pipeline Region. > >>>> > >>>> In high-parallelism streaming jobs, a single Task's checkpoint failure > >>>> causes the entire global Checkpoint to abort, leading to degraded > >>>> checkpoint success rates and wasted compute resources (especially for > >> GPU > >>>> operators). > >>>> > >>>> We propose Regional Checkpoint: when some Regions fail to checkpoint, > >> the > >>>> framework combines the historical state of the failed Regions with the > >>>> current state of the healthy Regions to produce a logically complete > >>>> Completed Checkpoint — while preserving state consistency. The key > >> changes > >>>> are: > >>>> > >>>> 1. Snapshot Collection — Allow partial region failures; combine last > >>>> successful state of failed Regions with current state of normal > Regions. > >>>> > >>>> 2. State Correction — New checkpointCoordinatorForRegionFallback > >> interface > >>>> for OperatorCoordinators to produce consistent snapshots against the > >> mixed > >>>> view. > >>>> > >>>> 3. Checkpoint Store — Track ref_checkpoint_id in metadata to prevent > >>>> premature cleanup of referenced historical checkpoints. > >>>> > >>>> The detailed design is described in the FLIP document: > >>>> > >>>> > >> > https://docs.google.com/document/d/153r9NjHN9xgFUBdZ8sNX6YjUWTREtDMv5i-JaMdE6NU/edit?usp=sharing > >>>> > >>>> Looking forward to your feedback! > >>>> > >>>> Best regards, > >>>> > >>>> Raorao Xiong > >>>> > >>>> > >>>> > >> > >> > >
