Hi Raorao, thanks for your answers

> > 2. Rescaling for OperatorSubtaskState

> `refCheckpointId` is checkpoint-level metadata stored in the completed
checkpoint —
> it doesn't participate in state redistribution during rescaling. The
`TaskStateAssignment`
> redistributes only the state handles to new subtasks, while the
`CompletedCheckpointStore`
> retains the original metadata (including `refCheckpointId`) for the
cleaner to trace
> reference chains. So rescaling is inherently compatible.

Do you mean that we'll run StateAssignmentOperation per every referenced
checkpoint on every "partial" checkpoint completion?
(it can be quite heavy)

I don't think that "redistributes only the state handles to new subtasks"
part is true - on rescaling, distribution changes for (potentially) every
sub-task.

>From the docs [1]:
- Even-split redistribution: Each operator returns a List of state
elements. The whole state is logically a concatenation of all lists. On
restore/redistribution, the list is evenly divided into as many sublists as
there are parallel operators. Each operator gets a sublist, which can be
empty, or contain one or more elements.
- Union redistribution: Each operator returns a List of state elements. The
whole state is logically a concatenation of all lists. On
restore/redistribution, each operator gets the complete list of state
elements. ...
- (we said earlier that keyed stated is not supported)

So I don't see how any of the above can work because we might mix old and
new states in a sub-task.

For example:
- a job is running with parallelism 2 and has even-split state distribution
- 1st checkpoint is completes by 2/2 sub-tasks
- 2nd checkpoint is completes by 1/2 sub-tasks: sub-task 2 failed the
checkpoint; its state in checkpoint 2 refers to "subtask 2 state in
checkpoint 1"
- job is rescaled to 1
- sub-task 1 now has state from checkpoint 1 AND 2?

Furthermore, with multiple rounds of downscaling, there can be a single
sub-task referring to multiple historical checkpoints.

For Union, it's even more problematic.

Also, what about channel state (Unaligned checkpoint) re-distribution?

Probably it's better to have a FLIP document with the corresponding section
first and then discuss it.

[1]
https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/state/#operator-state

Regards,
Roman


On Wed, Jun 24, 2026 at 5:21 AM 熊饶饶 <[email protected]> wrote:

> Hi Zakelly,
>
> Thanks for the feedback! I completely agree — forcing region-level merging
> would significantly reduce the effectiveness of file merging, and that's
> not the right trade-off.
>
> I think TM-level merging can work correctly with Regional Checkpoint if we
> ensure the FileMergingSnapshotManager properly handles the new checkpoint
> notification semantics. Specifically:
>
> - The FileMergingSnapshotManager already uses a notification-based
> lifecycle (checkpoint complete/abort/subsumed) to manage physical file
> deletion.
> - For Regional Checkpoint, we need to introduce the new notification type
> we discussed earlier (notify partially-completed) so that the merging
> manager knows: this checkpoint is complete but some segments belong to
> failed regions — keep the physical files that contain those segments alive
> until the referencing checkpoints are subsumed.
> - This aligns with the existing design and requires only minor adjustments
> to the merging manager's notification handling.
>
> The previous suggestion of region-level merging was overly conservative —
> glad you pointed out the better approach.
>
> Also good to know about the plan to cover channel state in FLIP-306. That
> will simplify the compatibility matrix significantly.
>
> Best Regards,
> Raorao
>
> > 2026年6月23日 23:33,Zakelly Lan <[email protected]> 写道:
> >
> > Hi Raorao,
> >
> > Good to see this proposal and +1 for the direction. Sorry for joining the
> > discussion late. I have seen many constructive suggestions that largely
> > align with my thoughts, but I still have one concern:
> >
> > I'm one of the authors of FLIP-306 and I'm not in favor of region-level
> > merging. IIUC, region-level merge files severely limit the effectiveness
> of
> > merging, as merging cannot happen between subtasks. I think it should
> still
> > be possible to perform TM-level merge. The only thing we should do is to
> > keep previous checkpoint files alive when a region checkpoint occurs.
> This
> > does not conflict with the current design. It is only necessary to ensure
> > that the new behavior of checkpoint notifications is compatible with
> > FLIP-306, or some minor adjustment needed towards the merging manager.
> >
> > And BTW to @Rui, we still need more work to let FLIP-306 cover channel
> > state and deprecate FLINK-26803, and I will look into this soon.
> >
> >
> > Best,
> > Zakelly
> >
> > On Tue, Jun 23, 2026 at 2:40 PM 熊饶饶 <[email protected]> wrote:
> >
> >> Hi Roman, thanks for the follow-up.
> >>
> >> 1. FLIP page
> >>
> >> Thanks for your reminder, I'll create the proper FLIP page before
> starting
> >> [VOTE] thread.
> >>
> >> 2. Rescaling for OperatorSubtaskState
> >>
> >> `refCheckpointId` is checkpoint-level metadata stored in the completed
> >> checkpoint — it doesn't participate in state redistribution during
> >> rescaling. The `TaskStateAssignment` redistributes only the state
> handles
> >> to new subtasks, while the `CompletedCheckpointStore` retains the
> original
> >> metadata (including `refCheckpointId`) for the cleaner to trace
> reference
> >> chains. So rescaling is inherently compatible.
> >>
> >> 3. Finished operators
> >>
> >> You're right, my previous answer missed the real issue. For bounded
> source
> >> jobs (FLIP-147), if the final checkpoint is Regional and a failed
> Region's
> >> subtask misses `notifyCheckpointComplete`, their side effects (e.g.,
> Kafka
> >> transactions) are never committed — that's data loss.
> >>
> >> Solution: force a global checkpoint when the job is about to terminate.
> >> When all sources are exhausted, the JM will mark the next checkpoint as
> >> mandatory global, requiring all regions to ack. If it fails, the job
> >> retries rather than terminating with a partial snapshot.
> >>
> >> 4. Limitations section
> >>
> >> I agree with you, and I'll add a section covering all limitations:
> >>
> >> BLOCKING/HYBRID edges between Regions -> Auto-disable at runtime
> >> FLINK-26803 / FLIP-306 -> Warn (future work: region-level merging)
> >> NO_CLAIM restore mode -> Warn (require global checkpoint before snapshot
> >> deletion)
> >> Changelog state backend ->  Reject job submission
> >> Finished operators (FLIP-147) -> Force global checkpoint when sources
> are
> >> exhausted
> >>
> >>
> >> Thanks again for the review, looking forward to your feedback.
> >>
> >> Regards,
> >> Raorao
> >>
> >>
> >>> 2026年6月19日 22:51,Roman Khachatryan <[email protected]> 写道:
> >>>
> >>> Hi, thanks for your replies and sorry for the delay.
> >>>
> >>> Most of my questions were answered, but I still have some concerns.
> >>>
> >>>> If there are no further concerns by next Monday (June 22), I'll go
> ahead
> >>> and start the [VOTE] thread for this FLIP.
> >>>
> >>> Isn't the actual FLIP still missing? I only saw Google Document. Do you
> >>> mind creating a page according to [1]?
> >>>
> >>> ----------------------------------------
> >>>
> >>>> 3. Checkpoint metadata layout
> >>>> Regional Checkpoint recombines state from different checkpoint IDs. To
> >>> track this, we add a refCheckpointId field to OperatorSubtaskState in
> the
> >>> metadata, indicating which historical checkpoint a subtask’s state
> >>> references.
> >>>
> >>> Could you explain how do we find the right OperatorSubtaskState -
> >>> especially in case of rescaling?
> >>> Does the proposal support rescaling?
> >>>
> >>>> 9. Finished operators
> >>>> The concern is: a finished operator’s final commit notification gets
> >>> skipped by Regional Checkpoint, and if this checkpoint is the last one,
> >> the
> >>> operator never receives it — could this cause data loss?
> >>>> In practice, the impact is limited:
> >>>> ● Failed Region tasks are already gone: By the time the Regional
> >>> Checkpoint completes, tasks in the failed Region have already been
> >>> restarted (decline) or cancelled (timeout). There is no task left to
> >>> receive the notification anyway.
> >>> Checkpoint failure doesn't necessarily cause a restart (especially if
> >> this
> >>> is limited to one region). The tasks should still be up and running.
> >>>
> >>>> ● maxConsecutiveFailures guarantees a global checkpoint: After
> reaching
> >>> the limit, the next checkpoint is forced to be global, ensuring all
> tasks
> >>> eventually receive notifyCheckpointComplete. We can’t skip the same
> >> Region
> >>> forever.
> >>> maxConsecutiveFailures might not be reached for the final checkpoint.
> >>>
> >>>> ● stop-with-savepoint bypasses Regional Checkpoint: When the user
> stops
> >>> the job gracefully, it triggers a full global snapshot, not a Regional
> >>> Checkpoint. So the final checkpoint is always complete.
> >>> stop-with-savepoint should be fine, yes.
> >>>
> >>> To clarify, my concern is about jobs with bounded sources. In such
> cases,
> >>> some subtasks might finish processing but still participate in
> >> checkpoints.
> >>> After a successful checkpoint, they are guaranteed to get checkpoint
> >>> completion notification - so that they can make side effects visible in
> >>> external systems (commit Kafka transactions).
> >>> See FLIP-147 [2]
> >>>
> >>> However, with the current proposal, the job might complete with some
> >>> subtasks/regions failing the final checkpoint unless I'm missing
> >> something.
> >>> This is essentially data loss.
> >>> To prevent this, the final checkpoint must always be acked by all
> >>> subtasks/regions.
> >>>
> >>> ----------------------------------------
> >>>
> >>> There are quite some limitations in this proposal.
> >>> Could you add a section describing how each of them is handled?
> >>> 1. Reject job submission
> >>> 2. Force all-region checkpoint
> >>> 3. Warn in documentation
> >>>
> >>>> 1. Region independence — BLOCKING/HYBRID edges
> >>>> You’re right. Our current scope is limited to embarrassingly parallel
> >>> regions. In typical ETL scenarios, each parallelism maps to an
> >> independent
> >>> Region with no edges connecting them.
> >>>
> >>>> 5. SharedStateRegistry — how are old states kept alive?
> >>>> Good question. In the current design, since we only target
> >> embarrassingly
> >>> parallel regions, there is typically no keyed state and no incremental
> >>> state. As a result, the SharedStateRegistry is generally empty (setting
> >>> aside File Merging and Changelog State for now, discussed on 8.), so
> >>> keep-alive of files under the shared directory is not a concern.
> >>>
> >>>> 8. FLINK-26803 and FLIP-306 compatibility
> >>>> This is a very important point. Both features essentially merge small
> >>> files at the job level. As Rui Fan pointed out, if the merging
> >> granularity
> >>> is reduced to the Region level, compatibility with Regional Checkpoint
> >>> should be achievable in theory. I think this can be deferred to future
> >> work
> >>> — once FLINK-26803 is consolidated into FLIP-306, we can revisit and
> >> enable
> >>> support.
> >>>
> >>>> 10. NO_CLAIM mode warning
> >>>> You’re absolutely right — this is an important reminder. After
> restoring
> >>> from a Regional Checkpoint, only a successful global checkpoint
> >> guarantees
> >>> independence from the old state. We’ll add a clear user warning in the
> >>> documentation.
> >>>
> >>>> 11. Changelog state backend — not supported
> >>>> As mentioned earlier, our primary target is embarrassingly parallel
> >>> regions, which typically have no keyed state and therefore no slow
> >>> incremental state flush issues. I don’t think we need to support
> >> Changelog
> >>> state backend for now.
> >>>
> >>> ----------------------------------------
> >>>
> >>>> 2. max-consecutive-failures exceeded — what exactly happens?
> >>>> The current design says “force a global checkpoint.” To clarify the
> >>> two-tier behavior:
> >>>> ● Tier 1: When consecutiveRegionalCount >= maxConsecutiveFailures, the
> >>> next checkpoint is forced to be global.
> >>>> ● Tier 2: If that forced global checkpoint also fails (any task
> >>> declines), the checkpoint is aborted normally (not a job failure). The
> >>> counter is then reset since a global checkpoint was attempted, and the
> >> next
> >>> checkpoint cycle can try again.
> >>>> This avoids cascading into job failure while ensuring we don’t drift
> >>> indefinitely on historical state.
> >>>
> >>> My assumption was that we would not allow this particular failed region
> >> to
> >>> fail the checkpoint again.
> >>> But forcing a global checkpoint works as well.
> >>>
> >>>> 6. Checkpoint abort notifications & Local Recovery cleanup — new
> >>> notification type
> >>>> This is a very insightful point. Zihao and Gen also raised this in
> >>> earlier discussions. The current design doesn’t address state cleanup
> for
> >>> tasks in failed regions. I agree it’s necessary to introduce a new
> >>> notification type. For tasks in failed regions, local state cleanup can
> >> be
> >>> deferred until the next checkpoint trigger.
> >>> Ok, this can be some future work.
> >>>
> >>>> 7. Task that never acknowledges nor declines — per-region timeouts
> >>>> This was discussed in the previous thread. Network issues may cause a
> >>> task to neither ack nor decline in time. In such cases, we treat it as
> a
> >>> checkpoint timeout: the affected tasks’ region is marked as failed, and
> >> the
> >>> process ultimately falls through to the normalRegional Checkpoint
> >>> processing logic.
> >>> Ok, this can be some future work.
> >>>
> >>> ----------------------------------------
> >>>
> >>> [1]
> >>>
> >>
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65145551#FlinkImprovementProposals-CreateyourOwnFLIP
> >>>
> >>> [2]
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
> >>>
> >>> Regards,
> >>> Roman
> >>>
> >>> Regards,
> >>> Roman
> >>>
> >>>
> >>> On Wed, Jun 17, 2026 at 9:56 AM 熊饶饶 <[email protected]> wrote:
> >>>
> >>>> Hi all,
> >>>>
> >>>> Thanks everyone for the valuable feedback. I believe all the points
> >> raised
> >>>> above have been addressed (@Roman @Rui Fan). If there are no further
> >>>> concerns  by next Monday (June 22), I'll go ahead and start the [VOTE]
> >>>> thread for this FLIP.
> >>>>
> >>>> For reference, the earlier related discussion can be found here:
> >>>> https://lists.apache.org/thread/qpztk0jdpcmhomszjx63l53xv26xnmwf
> >>>>
> >>>>
> >>>> Please feel free to share any additional feedback before then.
> >>>>
> >>>> Best Regards,
> >>>> Raorao
> >>>>
> >>>> 2026年5月27日 16:31,熊饶饶 <[email protected]> 写道:
> >>>>
> >>>> Hi devs,
> >>>>
> >>>> I would like to start a discussion on FLIP-XXX: Independent Checkpoint
> >>>> Based On Pipeline Region.
> >>>>
> >>>> In high-parallelism streaming jobs, a single Task's checkpoint failure
> >>>> causes the entire global Checkpoint to abort, leading to degraded
> >>>> checkpoint success rates and wasted compute resources (especially for
> >> GPU
> >>>> operators).
> >>>>
> >>>> We propose Regional Checkpoint: when some Regions fail to checkpoint,
> >> the
> >>>> framework combines the historical state of the failed Regions with the
> >>>> current state of the healthy Regions to produce a logically complete
> >>>> Completed Checkpoint — while preserving state consistency. The key
> >> changes
> >>>> are:
> >>>>
> >>>> 1. Snapshot Collection — Allow partial region failures; combine last
> >>>> successful state of failed Regions with current state of normal
> Regions.
> >>>>
> >>>> 2. State Correction — New checkpointCoordinatorForRegionFallback
> >> interface
> >>>> for OperatorCoordinators to produce consistent snapshots against the
> >> mixed
> >>>> view.
> >>>>
> >>>> 3. Checkpoint Store — Track ref_checkpoint_id in metadata to prevent
> >>>> premature cleanup of referenced historical checkpoints.
> >>>>
> >>>> The detailed design is described in the FLIP document:
> >>>>
> >>>>
> >>
> https://docs.google.com/document/d/153r9NjHN9xgFUBdZ8sNX6YjUWTREtDMv5i-JaMdE6NU/edit?usp=sharing
> >>>>
> >>>> Looking forward to your feedback!
> >>>>
> >>>> Best regards,
> >>>>
> >>>> Raorao Xiong
> >>>>
> >>>>
> >>>>
> >>
> >>
>
>

Reply via email to