[ 
https://issues.apache.org/jira/browse/FLINK-24611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17436699#comment-17436699
 ] 

Piotr Nowojski commented on FLINK-24611:
----------------------------------------

{quote}
As for the removal of unused state after JM failover, I think it actually would 
be a regression: currently, JM discards all the state on abort/suspend 
immediately. Or do you mean, that state artifacts may be left in general 
currently?
{quote}
I think there might be left over artefacts currently, but wouldn't this be 
significantly more likely with keeping pending/non confirmed files (as required 
per changelog)?

> Prevent JM from discarding state on checkpoint abortion
> -------------------------------------------------------
>
>                 Key: FLINK-24611
>                 URL: https://issues.apache.org/jira/browse/FLINK-24611
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.15.0
>            Reporter: Roman Khachatryan
>            Priority: Major
>             Fix For: 1.15.0
>
>
> When a checkpoint is aborted, JM discards any state that was sent to it and 
> wasn't used in other checkpoints. This forces incremental state backends to 
> wait for confirmation from JM before re-using this state. For changelog 
> backend this is even more critical.
> One approach proposed was to make backends/TMs responsible for their state, 
> until it's not shared with other TMs, i.e. until rescaling (private/shared 
> state ownership track: FLINK-23342 and more).
>  However, that approach is quite invasive.
>  
> An alternative solution would be:
>  1. SharedStateRegistry remembers the latest checkpoint for each shared state 
> (instead of usage count currently)
>  2. CompletedCheckpointStore notifies it about the lowest valid checkpoint 
> (on subsumption)
>  3. SharedStateRegistry then discards any state associated with the lower 
> (subsumed/aborted) checkpoints
>  So the aborted checkpoint can only be discarded after some subsequent 
> successful checkpoint (which can mark state as used).
> Only JM code is changed.
>  
> Implementation considerations.
> On subsumption, JM needs to find all the unused state and discard it.
>  This can either be done by
>  1) simply traversing all entries; or by 
>  2) maintaining a set of entries per checkpoint (e.g. SortedMap<Long, 
> List<K>>). This allows to skip unnecessary traversal at the cost of higher 
> memory usage
> In both cases:
>  - each entry stores last checkpoint ID it was used in (long)
>  - key is hashed (even with plain traversal, map.entrySet.iterator.remove() 
> computes hash internally)
> Given the following constraints:
>  - 10M state entries at most
>  - 10 (retained) checkpoint at most
>  - 10 checkpoints per second at most
>  - state entry key takes 32B (usually UUID or two UUIDs)
> The extra space for (2) would be in order of 10M*32B=38Mb.
>  The extra time for (1) would be in order of 10M * 10 checkpoints per second 
> * ratio of outdated entries per checkpoint. Depending on the ratio and the 
> hardware, this could take up to hundreds of ms per second, blocking the main 
> thread.
>  So approach (2) seems reasonable.
>  
> The following cases shouldn't pose any difficulties:
>  1. Recovery, re-scaling, and state used by not all or by no tasks - we still 
> register all states on recovery even after FLINK-22483/FLINK-24086
>  2. PlaceholderStreamStateHandles
>  3. Cross-task state sharing - not an issue as long as everything is managed 
> by JM
>  4. Dependencies between SharedStateRegistry and CompletedCheckpointStore - 
> simple after FLINK-24086
>  5. Multiple concurrent checkpoints (below)
> Consider the following case:
> (nr. concurrent checkpoints > 1)
> 1. checkpoint 1 starts, TM reports that it uses file f1; checkpoint 1 gets 
> aborted - f1 is now tracked
> 2. savepoint 2 starts, it *will* use f1
> 3. checkpoint 3 starts and finishes; it does NOT use file f1
> When a checkpoint finishes, all pending checkpoints before it are aborted - 
> but not savepoints.
> Savepoints currently are NOT incremental. And in the future, incremental 
> savepoints shouldn't share any artifacts with checkpionts.
> The following should be kept in mind:
>  1. On job cancellation, state of aborted checkpoints should be cleaned up 
> explicitly
>  2. Savepoints should be ignored and not change 
> CheckpointStore.lowestCheckpointID
>  
> For the end users, this change might render as a delay in discarding state of 
> aborted checkpoints, which seems acceptable.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to