Hi Roman, thanks for the detailed follow-up.
Rescaling and state redistribution
StateAssignmentOperation runs only once per restore — it doesn’t run per 
referenced checkpoint. The subtask states entering StateAssignmentOperation 
have already been pre-merged during the Regional Checkpoint completion phase. 
Each OperatorSubtaskState already physically contains the state handles from 
whichever checkpoint it originated from. The refCheckpointId is just metadata 
for the cleaner.
Example: P=4 → P=2 → P=1, two rounds of downscaling
Initial state (P=4, even-split operator state):
ckp100 (T1, GLOBAL): all 4 regions ack
  subtask 0: [A0, A1]    region 0
  subtask 1: [B0, B1]    region 1
  subtask 2: [C0, C1]    region 2
  subtask 3: [D0, D1]    region 3
First downscale: P=4 → P=2 (recover from ckp100 after failover)
Even-split redistribution:
concatenate: [A0,A1, B0,B1, C0,C1, D0,D1]  → 8 elements
split into 2:
  new subtask 0: [A0, A1, B0, B1]
  new subtask 1: [C0, C1, D0, D1]
Now running at P=2:
ckp101 (T2, REGIONAL):
  new-subtask 0 ack: [E0, E1, E2, E3]
  new-subtask 1 decline, fallback to ckp100 → [C0,C1,D0,D1]  (ref=100)
 
ckp101 metadata (pre-merged):
  new-subtask 0: stateHandles=[E0,E1,E2,E3], refCheckpointId=null
  new-subtask 1: stateHandles=[C0,C1,D0,D1], refCheckpointId=100
Second downscale: P=2 → P=1 (recover from ckp101)
Even-split redistribution:
concatenate: [E0,E1,E2,E3, C0,C1,D0,D1]  → 8 elements
split into 1:
  new subtask 0: [E0,E1,E2,E3, C0,C1,D0,D1]
At this point, the single subtask’s state contains elements from two time 
points — this is safe per the partition isolation argument above. Now the job 
continues running at P=1:
ckp102 (T3, REGIONAL):
  new subtask 0 decline, fallback to ckp101's redistributed state
  → ref=101
 
ckp102 metadata (pre-merged):
  subtask 0: stateHandles=[E0,E1,E2,E3, C0,C1,D0,D1], refCheckpointId=101
 
ref chain: ckp102(ref=101) → ckp101(ref=100) → ckp100
The ref chain is tracked entirely in the CompletedCheckpoint store metadata. 
Rescaling is a restore operation — it redistributes state handles to TMs but 
doesn't create new checkpoint metadata. The next Regional Checkpoint (ckp102) 
references the previous CompletedCheckpoint (ckp101) from the store, which 
still has its original ref=100 annotation in the metadata.
 
Union state, same scenario:
ckp100 (T1, GLOBAL): P=4, union operator state
  subtask 0: [A0, A1]
  subtask 1: [B0, B1]
  subtask 2: [C0, C1]
  subtask 3: [D0, D1]
 
P=4 → P=2 (union):
  new subtask 0 gets FULL concatenated: [A0,A1, B0,B1, C0,C1, D0,D1]
  new subtask 1 gets FULL concatenated: same
 
ckp101 (T2, REGIONAL):
  new-subtask 0 ack: [E0, ..., E7]
  new-subtask 1 decline, fallback → [A0,A1,B0,B1,C0,C1,D0,D1]  (ref=100)
 
P=2 → P=1 (union):
  new subtask 0 gets: [E0,...,E7, A0,...,D1]  ← complete set, safe
Union is even simpler — each subtask always gets the complete state, so mixing 
doesn’t add new complexity.
refCheckpointId chain bounded by maxConsecutiveFailures
 
ckp100 (global)
  → ckp101 (regional, ref=100)  consecutive=1
  → ckp102 (regional, ref=101)  consecutive=2
  → ckp103: consecutive >= maxConsecutiveFailures(2)
     → FORCED GLOBAL → if success, chain resets
The reference chain can only grow as deep as maxConsecutiveFailures. Once a 
global checkpoint succeeds, it’s a fresh start. No inflation beyond the 
configured limit.
Channel state redistribution
Each channel state handle carries InputChannelInfo / ResultSubpartitionInfo. 
TaskStateAssignment maps handles by their channel metadata — time of origin is 
irrelevant. Per-subtask channel state and operator state come from the same 
checkpoint snapshot, ensuring internal consistency regardless of which 
checkpoint other subtasks reference.
Summary
1.  One StateAssignmentOperation per restore — states are pre-merged
2.  POINTWISE partition isolation makes mixing states across time points safe 
for both even-split and union
3.  maxConsecutiveFailures bounds refCheckpointId chain depth
4.  Channel state redistribution is time-agnostic

I’ll cover all this in the FLIP document.

Thanks again for the detailed replies, looking forward to your feedback.
 
Regards,
Raorao

> 2026年6月24日 18:38,Roman Khachatryan <[email protected]> 写道:
> 
> Hi Raorao, thanks for your answers
> 
>>> 2. Rescaling for OperatorSubtaskState
> 
>> `refCheckpointId` is checkpoint-level metadata stored in the completed
> checkpoint —
>> it doesn't participate in state redistribution during rescaling. The
> `TaskStateAssignment`
>> redistributes only the state handles to new subtasks, while the
> `CompletedCheckpointStore`
>> retains the original metadata (including `refCheckpointId`) for the
> cleaner to trace
>> reference chains. So rescaling is inherently compatible.
> 
> Do you mean that we'll run StateAssignmentOperation per every referenced
> checkpoint on every "partial" checkpoint completion?
> (it can be quite heavy)
> 
> I don't think that "redistributes only the state handles to new subtasks"
> part is true - on rescaling, distribution changes for (potentially) every
> sub-task.
> 
> From the docs [1]:
> - Even-split redistribution: Each operator returns a List of state
> elements. The whole state is logically a concatenation of all lists. On
> restore/redistribution, the list is evenly divided into as many sublists as
> there are parallel operators. Each operator gets a sublist, which can be
> empty, or contain one or more elements.
> - Union redistribution: Each operator returns a List of state elements. The
> whole state is logically a concatenation of all lists. On
> restore/redistribution, each operator gets the complete list of state
> elements. ...
> - (we said earlier that keyed stated is not supported)
> 
> So I don't see how any of the above can work because we might mix old and
> new states in a sub-task.
> 
> For example:
> - a job is running with parallelism 2 and has even-split state distribution
> - 1st checkpoint is completes by 2/2 sub-tasks
> - 2nd checkpoint is completes by 1/2 sub-tasks: sub-task 2 failed the
> checkpoint; its state in checkpoint 2 refers to "subtask 2 state in
> checkpoint 1"
> - job is rescaled to 1
> - sub-task 1 now has state from checkpoint 1 AND 2?
> 
> Furthermore, with multiple rounds of downscaling, there can be a single
> sub-task referring to multiple historical checkpoints.
> 
> For Union, it's even more problematic.
> 
> Also, what about channel state (Unaligned checkpoint) re-distribution?
> 
> Probably it's better to have a FLIP document with the corresponding section
> first and then discuss it.
> 
> [1]
> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/fault-tolerance/state/#operator-state
> 
> Regards,
> Roman
> 
> 
> On Wed, Jun 24, 2026 at 5:21 AM 熊饶饶 <[email protected]> wrote:
> 
>> Hi Zakelly,
>> 
>> Thanks for the feedback! I completely agree — forcing region-level merging
>> would significantly reduce the effectiveness of file merging, and that's
>> not the right trade-off.
>> 
>> I think TM-level merging can work correctly with Regional Checkpoint if we
>> ensure the FileMergingSnapshotManager properly handles the new checkpoint
>> notification semantics. Specifically:
>> 
>> - The FileMergingSnapshotManager already uses a notification-based
>> lifecycle (checkpoint complete/abort/subsumed) to manage physical file
>> deletion.
>> - For Regional Checkpoint, we need to introduce the new notification type
>> we discussed earlier (notify partially-completed) so that the merging
>> manager knows: this checkpoint is complete but some segments belong to
>> failed regions — keep the physical files that contain those segments alive
>> until the referencing checkpoints are subsumed.
>> - This aligns with the existing design and requires only minor adjustments
>> to the merging manager's notification handling.
>> 
>> The previous suggestion of region-level merging was overly conservative —
>> glad you pointed out the better approach.
>> 
>> Also good to know about the plan to cover channel state in FLIP-306. That
>> will simplify the compatibility matrix significantly.
>> 
>> Best Regards,
>> Raorao
>> 
>>> 2026年6月23日 23:33,Zakelly Lan <[email protected]> 写道:
>>> 
>>> Hi Raorao,
>>> 
>>> Good to see this proposal and +1 for the direction. Sorry for joining the
>>> discussion late. I have seen many constructive suggestions that largely
>>> align with my thoughts, but I still have one concern:
>>> 
>>> I'm one of the authors of FLIP-306 and I'm not in favor of region-level
>>> merging. IIUC, region-level merge files severely limit the effectiveness
>> of
>>> merging, as merging cannot happen between subtasks. I think it should
>> still
>>> be possible to perform TM-level merge. The only thing we should do is to
>>> keep previous checkpoint files alive when a region checkpoint occurs.
>> This
>>> does not conflict with the current design. It is only necessary to ensure
>>> that the new behavior of checkpoint notifications is compatible with
>>> FLIP-306, or some minor adjustment needed towards the merging manager.
>>> 
>>> And BTW to @Rui, we still need more work to let FLIP-306 cover channel
>>> state and deprecate FLINK-26803, and I will look into this soon.
>>> 
>>> 
>>> Best,
>>> Zakelly
>>> 
>>> On Tue, Jun 23, 2026 at 2:40 PM 熊饶饶 <[email protected]> wrote:
>>> 
>>>> Hi Roman, thanks for the follow-up.
>>>> 
>>>> 1. FLIP page
>>>> 
>>>> Thanks for your reminder, I'll create the proper FLIP page before
>> starting
>>>> [VOTE] thread.
>>>> 
>>>> 2. Rescaling for OperatorSubtaskState
>>>> 
>>>> `refCheckpointId` is checkpoint-level metadata stored in the completed
>>>> checkpoint — it doesn't participate in state redistribution during
>>>> rescaling. The `TaskStateAssignment` redistributes only the state
>> handles
>>>> to new subtasks, while the `CompletedCheckpointStore` retains the
>> original
>>>> metadata (including `refCheckpointId`) for the cleaner to trace
>> reference
>>>> chains. So rescaling is inherently compatible.
>>>> 
>>>> 3. Finished operators
>>>> 
>>>> You're right, my previous answer missed the real issue. For bounded
>> source
>>>> jobs (FLIP-147), if the final checkpoint is Regional and a failed
>> Region's
>>>> subtask misses `notifyCheckpointComplete`, their side effects (e.g.,
>> Kafka
>>>> transactions) are never committed — that's data loss.
>>>> 
>>>> Solution: force a global checkpoint when the job is about to terminate.
>>>> When all sources are exhausted, the JM will mark the next checkpoint as
>>>> mandatory global, requiring all regions to ack. If it fails, the job
>>>> retries rather than terminating with a partial snapshot.
>>>> 
>>>> 4. Limitations section
>>>> 
>>>> I agree with you, and I'll add a section covering all limitations:
>>>> 
>>>> BLOCKING/HYBRID edges between Regions -> Auto-disable at runtime
>>>> FLINK-26803 / FLIP-306 -> Warn (future work: region-level merging)
>>>> NO_CLAIM restore mode -> Warn (require global checkpoint before snapshot
>>>> deletion)
>>>> Changelog state backend ->  Reject job submission
>>>> Finished operators (FLIP-147) -> Force global checkpoint when sources
>> are
>>>> exhausted
>>>> 
>>>> 
>>>> Thanks again for the review, looking forward to your feedback.
>>>> 
>>>> Regards,
>>>> Raorao
>>>> 
>>>> 
>>>>> 2026年6月19日 22:51,Roman Khachatryan <[email protected]> 写道:
>>>>> 
>>>>> Hi, thanks for your replies and sorry for the delay.
>>>>> 
>>>>> Most of my questions were answered, but I still have some concerns.
>>>>> 
>>>>>> If there are no further concerns by next Monday (June 22), I'll go
>> ahead
>>>>> and start the [VOTE] thread for this FLIP.
>>>>> 
>>>>> Isn't the actual FLIP still missing? I only saw Google Document. Do you
>>>>> mind creating a page according to [1]?
>>>>> 
>>>>> ----------------------------------------
>>>>> 
>>>>>> 3. Checkpoint metadata layout
>>>>>> Regional Checkpoint recombines state from different checkpoint IDs. To
>>>>> track this, we add a refCheckpointId field to OperatorSubtaskState in
>> the
>>>>> metadata, indicating which historical checkpoint a subtask’s state
>>>>> references.
>>>>> 
>>>>> Could you explain how do we find the right OperatorSubtaskState -
>>>>> especially in case of rescaling?
>>>>> Does the proposal support rescaling?
>>>>> 
>>>>>> 9. Finished operators
>>>>>> The concern is: a finished operator’s final commit notification gets
>>>>> skipped by Regional Checkpoint, and if this checkpoint is the last one,
>>>> the
>>>>> operator never receives it — could this cause data loss?
>>>>>> In practice, the impact is limited:
>>>>>> ● Failed Region tasks are already gone: By the time the Regional
>>>>> Checkpoint completes, tasks in the failed Region have already been
>>>>> restarted (decline) or cancelled (timeout). There is no task left to
>>>>> receive the notification anyway.
>>>>> Checkpoint failure doesn't necessarily cause a restart (especially if
>>>> this
>>>>> is limited to one region). The tasks should still be up and running.
>>>>> 
>>>>>> ● maxConsecutiveFailures guarantees a global checkpoint: After
>> reaching
>>>>> the limit, the next checkpoint is forced to be global, ensuring all
>> tasks
>>>>> eventually receive notifyCheckpointComplete. We can’t skip the same
>>>> Region
>>>>> forever.
>>>>> maxConsecutiveFailures might not be reached for the final checkpoint.
>>>>> 
>>>>>> ● stop-with-savepoint bypasses Regional Checkpoint: When the user
>> stops
>>>>> the job gracefully, it triggers a full global snapshot, not a Regional
>>>>> Checkpoint. So the final checkpoint is always complete.
>>>>> stop-with-savepoint should be fine, yes.
>>>>> 
>>>>> To clarify, my concern is about jobs with bounded sources. In such
>> cases,
>>>>> some subtasks might finish processing but still participate in
>>>> checkpoints.
>>>>> After a successful checkpoint, they are guaranteed to get checkpoint
>>>>> completion notification - so that they can make side effects visible in
>>>>> external systems (commit Kafka transactions).
>>>>> See FLIP-147 [2]
>>>>> 
>>>>> However, with the current proposal, the job might complete with some
>>>>> subtasks/regions failing the final checkpoint unless I'm missing
>>>> something.
>>>>> This is essentially data loss.
>>>>> To prevent this, the final checkpoint must always be acked by all
>>>>> subtasks/regions.
>>>>> 
>>>>> ----------------------------------------
>>>>> 
>>>>> There are quite some limitations in this proposal.
>>>>> Could you add a section describing how each of them is handled?
>>>>> 1. Reject job submission
>>>>> 2. Force all-region checkpoint
>>>>> 3. Warn in documentation
>>>>> 
>>>>>> 1. Region independence — BLOCKING/HYBRID edges
>>>>>> You’re right. Our current scope is limited to embarrassingly parallel
>>>>> regions. In typical ETL scenarios, each parallelism maps to an
>>>> independent
>>>>> Region with no edges connecting them.
>>>>> 
>>>>>> 5. SharedStateRegistry — how are old states kept alive?
>>>>>> Good question. In the current design, since we only target
>>>> embarrassingly
>>>>> parallel regions, there is typically no keyed state and no incremental
>>>>> state. As a result, the SharedStateRegistry is generally empty (setting
>>>>> aside File Merging and Changelog State for now, discussed on 8.), so
>>>>> keep-alive of files under the shared directory is not a concern.
>>>>> 
>>>>>> 8. FLINK-26803 and FLIP-306 compatibility
>>>>>> This is a very important point. Both features essentially merge small
>>>>> files at the job level. As Rui Fan pointed out, if the merging
>>>> granularity
>>>>> is reduced to the Region level, compatibility with Regional Checkpoint
>>>>> should be achievable in theory. I think this can be deferred to future
>>>> work
>>>>> — once FLINK-26803 is consolidated into FLIP-306, we can revisit and
>>>> enable
>>>>> support.
>>>>> 
>>>>>> 10. NO_CLAIM mode warning
>>>>>> You’re absolutely right — this is an important reminder. After
>> restoring
>>>>> from a Regional Checkpoint, only a successful global checkpoint
>>>> guarantees
>>>>> independence from the old state. We’ll add a clear user warning in the
>>>>> documentation.
>>>>> 
>>>>>> 11. Changelog state backend — not supported
>>>>>> As mentioned earlier, our primary target is embarrassingly parallel
>>>>> regions, which typically have no keyed state and therefore no slow
>>>>> incremental state flush issues. I don’t think we need to support
>>>> Changelog
>>>>> state backend for now.
>>>>> 
>>>>> ----------------------------------------
>>>>> 
>>>>>> 2. max-consecutive-failures exceeded — what exactly happens?
>>>>>> The current design says “force a global checkpoint.” To clarify the
>>>>> two-tier behavior:
>>>>>> ● Tier 1: When consecutiveRegionalCount >= maxConsecutiveFailures, the
>>>>> next checkpoint is forced to be global.
>>>>>> ● Tier 2: If that forced global checkpoint also fails (any task
>>>>> declines), the checkpoint is aborted normally (not a job failure). The
>>>>> counter is then reset since a global checkpoint was attempted, and the
>>>> next
>>>>> checkpoint cycle can try again.
>>>>>> This avoids cascading into job failure while ensuring we don’t drift
>>>>> indefinitely on historical state.
>>>>> 
>>>>> My assumption was that we would not allow this particular failed region
>>>> to
>>>>> fail the checkpoint again.
>>>>> But forcing a global checkpoint works as well.
>>>>> 
>>>>>> 6. Checkpoint abort notifications & Local Recovery cleanup — new
>>>>> notification type
>>>>>> This is a very insightful point. Zihao and Gen also raised this in
>>>>> earlier discussions. The current design doesn’t address state cleanup
>> for
>>>>> tasks in failed regions. I agree it’s necessary to introduce a new
>>>>> notification type. For tasks in failed regions, local state cleanup can
>>>> be
>>>>> deferred until the next checkpoint trigger.
>>>>> Ok, this can be some future work.
>>>>> 
>>>>>> 7. Task that never acknowledges nor declines — per-region timeouts
>>>>>> This was discussed in the previous thread. Network issues may cause a
>>>>> task to neither ack nor decline in time. In such cases, we treat it as
>> a
>>>>> checkpoint timeout: the affected tasks’ region is marked as failed, and
>>>> the
>>>>> process ultimately falls through to the normalRegional Checkpoint
>>>>> processing logic.
>>>>> Ok, this can be some future work.
>>>>> 
>>>>> ----------------------------------------
>>>>> 
>>>>> [1]
>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65145551#FlinkImprovementProposals-CreateyourOwnFLIP
>>>>> 
>>>>> [2]
>>>>> 
>>>> 
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
>>>>> 
>>>>> Regards,
>>>>> Roman
>>>>> 
>>>>> Regards,
>>>>> Roman
>>>>> 
>>>>> 
>>>>> On Wed, Jun 17, 2026 at 9:56 AM 熊饶饶 <[email protected]> wrote:
>>>>> 
>>>>>> Hi all,
>>>>>> 
>>>>>> Thanks everyone for the valuable feedback. I believe all the points
>>>> raised
>>>>>> above have been addressed (@Roman @Rui Fan). If there are no further
>>>>>> concerns  by next Monday (June 22), I'll go ahead and start the [VOTE]
>>>>>> thread for this FLIP.
>>>>>> 
>>>>>> For reference, the earlier related discussion can be found here:
>>>>>> https://lists.apache.org/thread/qpztk0jdpcmhomszjx63l53xv26xnmwf
>>>>>> 
>>>>>> 
>>>>>> Please feel free to share any additional feedback before then.
>>>>>> 
>>>>>> Best Regards,
>>>>>> Raorao
>>>>>> 
>>>>>> 2026年5月27日 16:31,熊饶饶 <[email protected]> 写道:
>>>>>> 
>>>>>> Hi devs,
>>>>>> 
>>>>>> I would like to start a discussion on FLIP-XXX: Independent Checkpoint
>>>>>> Based On Pipeline Region.
>>>>>> 
>>>>>> In high-parallelism streaming jobs, a single Task's checkpoint failure
>>>>>> causes the entire global Checkpoint to abort, leading to degraded
>>>>>> checkpoint success rates and wasted compute resources (especially for
>>>> GPU
>>>>>> operators).
>>>>>> 
>>>>>> We propose Regional Checkpoint: when some Regions fail to checkpoint,
>>>> the
>>>>>> framework combines the historical state of the failed Regions with the
>>>>>> current state of the healthy Regions to produce a logically complete
>>>>>> Completed Checkpoint — while preserving state consistency. The key
>>>> changes
>>>>>> are:
>>>>>> 
>>>>>> 1. Snapshot Collection — Allow partial region failures; combine last
>>>>>> successful state of failed Regions with current state of normal
>> Regions.
>>>>>> 
>>>>>> 2. State Correction — New checkpointCoordinatorForRegionFallback
>>>> interface
>>>>>> for OperatorCoordinators to produce consistent snapshots against the
>>>> mixed
>>>>>> view.
>>>>>> 
>>>>>> 3. Checkpoint Store — Track ref_checkpoint_id in metadata to prevent
>>>>>> premature cleanup of referenced historical checkpoints.
>>>>>> 
>>>>>> The detailed design is described in the FLIP document:
>>>>>> 
>>>>>> 
>>>> 
>> https://docs.google.com/document/d/153r9NjHN9xgFUBdZ8sNX6YjUWTREtDMv5i-JaMdE6NU/edit?usp=sharing
>>>>>> 
>>>>>> Looking forward to your feedback!
>>>>>> 
>>>>>> Best regards,
>>>>>> 
>>>>>> Raorao Xiong
>>>>>> 
>>>>>> 
>>>>>> 
>>>> 
>>>> 
>> 
>> 

Reply via email to