[ https://issues.apache.org/jira/browse/FLINK-24611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17436699#comment-17436699 ]
Piotr Nowojski commented on FLINK-24611: ---------------------------------------- {quote} As for the removal of unused state after JM failover, I think it actually would be a regression: currently, JM discards all the state on abort/suspend immediately. Or do you mean, that state artifacts may be left in general currently? {quote} I think there might be left over artefacts currently, but wouldn't this be significantly more likely with keeping pending/non confirmed files (as required per changelog)? > Prevent JM from discarding state on checkpoint abortion > ------------------------------------------------------- > > Key: FLINK-24611 > URL: https://issues.apache.org/jira/browse/FLINK-24611 > Project: Flink > Issue Type: Sub-task > Components: Runtime / Checkpointing > Affects Versions: 1.15.0 > Reporter: Roman Khachatryan > Priority: Major > Fix For: 1.15.0 > > > When a checkpoint is aborted, JM discards any state that was sent to it and > wasn't used in other checkpoints. This forces incremental state backends to > wait for confirmation from JM before re-using this state. For changelog > backend this is even more critical. > One approach proposed was to make backends/TMs responsible for their state, > until it's not shared with other TMs, i.e. until rescaling (private/shared > state ownership track: FLINK-23342 and more). > However, that approach is quite invasive. > > An alternative solution would be: > 1. SharedStateRegistry remembers the latest checkpoint for each shared state > (instead of usage count currently) > 2. CompletedCheckpointStore notifies it about the lowest valid checkpoint > (on subsumption) > 3. SharedStateRegistry then discards any state associated with the lower > (subsumed/aborted) checkpoints > So the aborted checkpoint can only be discarded after some subsequent > successful checkpoint (which can mark state as used). > Only JM code is changed. > > Implementation considerations. > On subsumption, JM needs to find all the unused state and discard it. > This can either be done by > 1) simply traversing all entries; or by > 2) maintaining a set of entries per checkpoint (e.g. SortedMap<Long, > List<K>>). This allows to skip unnecessary traversal at the cost of higher > memory usage > In both cases: > - each entry stores last checkpoint ID it was used in (long) > - key is hashed (even with plain traversal, map.entrySet.iterator.remove() > computes hash internally) > Given the following constraints: > - 10M state entries at most > - 10 (retained) checkpoint at most > - 10 checkpoints per second at most > - state entry key takes 32B (usually UUID or two UUIDs) > The extra space for (2) would be in order of 10M*32B=38Mb. > The extra time for (1) would be in order of 10M * 10 checkpoints per second > * ratio of outdated entries per checkpoint. Depending on the ratio and the > hardware, this could take up to hundreds of ms per second, blocking the main > thread. > So approach (2) seems reasonable. > > The following cases shouldn't pose any difficulties: > 1. Recovery, re-scaling, and state used by not all or by no tasks - we still > register all states on recovery even after FLINK-22483/FLINK-24086 > 2. PlaceholderStreamStateHandles > 3. Cross-task state sharing - not an issue as long as everything is managed > by JM > 4. Dependencies between SharedStateRegistry and CompletedCheckpointStore - > simple after FLINK-24086 > 5. Multiple concurrent checkpoints (below) > Consider the following case: > (nr. concurrent checkpoints > 1) > 1. checkpoint 1 starts, TM reports that it uses file f1; checkpoint 1 gets > aborted - f1 is now tracked > 2. savepoint 2 starts, it *will* use f1 > 3. checkpoint 3 starts and finishes; it does NOT use file f1 > When a checkpoint finishes, all pending checkpoints before it are aborted - > but not savepoints. > Savepoints currently are NOT incremental. And in the future, incremental > savepoints shouldn't share any artifacts with checkpionts. > The following should be kept in mind: > 1. On job cancellation, state of aborted checkpoints should be cleaned up > explicitly > 2. Savepoints should be ignored and not change > CheckpointStore.lowestCheckpointID > > For the end users, this change might render as a delay in discarding state of > aborted checkpoints, which seems acceptable. -- This message was sent by Atlassian Jira (v8.3.4#803005)