[ 
https://issues.apache.org/jira/browse/FLINK-24611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17435555#comment-17435555
 ] 

Stephan Ewen commented on FLINK-24611:
--------------------------------------

I would be curious to learn your ideas on how to solve _"On job cancellation, 
state of aborted checkpoints should be cleaned up explicitly"_.

The way I understand it, this is actually fundamentally difficult, because of 
the situation described below.

*BTW: I think this complexity exists regardless of whether we change the was 
the StateRegistry works, or whether we introduce TM-owned state. This 
complexity comes purely from the fact that we want to allow checkpoints to 
reference state from a previous checkpoint that wasn't yet notified as 
complete.*

When the Checkpoint Coordinator is suspended and wants to discard the state for 
the aborted checkpoint X, we need to make sure that no Task assumes that this 
state exists when creating checkpoint X+1 (possibly after the checkpoint 
coordinator un-suspended or resumed on a different leader).

Previously, the contract was that Tasks can never reference state from 
checkpoints that were not confirmed, but we are explicitly changing this. So we 
need to be very careful that we don't remove the state when someone might still 
assume it would be there.
For example when a regional failover happens, the checkpoint coordinator is 
suspended until recovery is complete. Pending checkpoints are aborted. But some 
tasks keep running and will assume that they can build on the previous state.

Because of that, we cannot delete state from pending checkpoints (that is new 
and wasn't previously referenced) always when the Checkpoint Coordinator is 
suspended.

I think we can do something like that:

(1) We trigger canceled state cleanup only when the job is actually terminated 
(cancelled, finished, terminally failed). No attempted recovery by the same job 
any more. I think the current API gives us the reason why the Checkpoint 
Coordinator is suspended, so we could use that to realize this behavior.

(2) We need a way to catch state where we lost the reference around a failover. 

For JM failover, we will for sure lose state references:
  - latest completed checkpoint is X
  - in progress is checkpoint X+1, with some state acked already
  - JM failure
  - JM recovery restores registry from checkpoint X, all tasks restore from 
checkpoint X
  - we have shared state artifacts from checkpoint X+1 that no one has a 
reference to.

My best guess how to solve this is with a GC daemon that periodically lists the 
shared state directory and finds files that are not not referenced any more in 
shared state registry.

BTW: (2) is an existing problem, so this wouldn't be a regression.

Curious to hear also other thoughts on this.

> Prevent JM from discarding state on checkpoint abortion
> -------------------------------------------------------
>
>                 Key: FLINK-24611
>                 URL: https://issues.apache.org/jira/browse/FLINK-24611
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.15.0
>            Reporter: Roman Khachatryan
>            Priority: Major
>             Fix For: 1.15.0
>
>
> When a checkpoint is aborted, JM discards any state that was sent to it and 
> wasn't used in other checkpoints. This forces incremental state backends to 
> wait for confirmation from JM before re-using this state. For changelog 
> backend this is even more critical.
> One approach proposed was to make backends/TMs responsible for their state, 
> until it's not shared with other TMs, i.e. until rescaling (private/shared 
> state ownership track: FLINK-23342 and more).
>  However, that approach is quite invasive.
>  
> An alternative solution would be:
>  1. SharedStateRegistry remembers the latest checkpoint for each shared state 
> (instead of usage count currently)
>  2. CompletedCheckpointStore notifies it about the lowest valid checkpoint 
> (on subsumption)
>  3. SharedStateRegistry then discards any state associated with the lower 
> (subsumed/aborted) checkpoints
>  So the aborted checkpoint can only be discarded after some subsequent 
> successful checkpoint (which can mark state as used).
> Only JM code is changed.
>  
> Implementation considerations.
> On subsumption, JM needs to find all the unused state and discard it.
>  This can either be done by
>  1) simply traversing all entries; or by 
>  2) maintaining a set of entries per checkpoint (e.g. SortedMap<Long, 
> List<K>>). This allows to skip unnecessary traversal at the cost of higher 
> memory usage
> In both cases:
>  - each entry stores last checkpoint ID it was used in (long)
>  - key is hashed (even with plain traversal, map.entrySet.iterator.remove() 
> computes hash internally)
> Given the following constraints:
>  - 10M state entries at most
>  - 10 (retained) checkpoint at most
>  - 10 checkpoints per second at most
>  - state entry key takes 32B (usually UUID or two UUIDs)
> The extra space for (2) would be in order of 10M*32B=38Mb.
>  The extra time for (1) would be in order of 10M * 10 checkpoints per second 
> * ratio of outdated entries per checkpoint. Depending on the ratio and the 
> hardware, this could take up to hundreds of ms per second, blocking the main 
> thread.
>  So approach (2) seems reasonable.
>  
> The following cases shouldn't pose any difficulties:
>  1. Recovery, re-scaling, and state used by not all or by no tasks - we still 
> register all states on recovery even after FLINK-22483/FLINK-24086
>  2. PlaceholderStreamStateHandles
>  3. Cross-task state sharing - not an issue as long as everything is managed 
> by JM
>  4. Dependencies between SharedStateRegistry and CompletedCheckpointStore - 
> simple after FLINK-24086
>  5. Multiple concurrent checkpoints (below)
> Consider the following case:
> (nr. concurrent checkpoints > 1)
> 1. checkpoint 1 starts, TM reports that it uses file f1; checkpoint 1 gets 
> aborted - f1 is now tracked
> 2. savepoint 2 starts, it *will* use f1
> 3. checkpoint 3 starts and finishes; it does NOT use file f1
> When a checkpoint finishes, all pending checkpoints before it are aborted - 
> but not savepoints.
> Savepoints currently are NOT incremental. And in the future, incremental 
> savepoints shouldn't share any artifacts with checkpionts.
> The following should be kept in mind:
>  1. On job cancellation, state of aborted checkpoints should be cleaned up 
> explicitly
>  2. Savepoints should be ignored and not change 
> CheckpointStore.lowestCheckpointID
>  
> For the end users, this change might render as a delay in discarding state of 
> aborted checkpoints, which seems acceptable.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to