I would like to bring this topic up one more time. I put some more thought
into it and created FLIP-270 [1] as a follow-up of FLIP-194 [2] with an
updated version of what I summarized in my previous email. It would be
interesting to get some additional perspectives on this; more specifically,
the two included proposals about either just repurposing the
CompletedCheckpointStore into a more generic CheckpointStore or refactoring
the StateHandleStore interface moving all the cleanup logic from the
CheckpointsCleaner and StateHandleStore into what's currently called
CompletedCheckpointStore.

Looking forward to feedback on that proposal.

Best,
Matthias

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-270%3A+Repeatable+Cleanup+of+Checkpoints
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore

On Wed, Sep 28, 2022 at 4:07 PM Matthias Pohl <matthias.p...@aiven.io>
wrote:

> Hi everyone,
>
> I’d like to start a discussion on repeatable cleanup of checkpoint data.
> In FLIP-194 [1] we introduced repeatable cleanup of HA data along the
> introduction of the JobResultStore component. The goal was to make Flink
> being in charge of cleanup for the data it owns. The Flink cluster should
> only shutdown gracefully after all its artifacts are removed. That way, one
> would not miss abandoned artifacts accidentally.
>
> We forgot to cover one code path around cleaning up checkpoint data.
> Currently, in case of an error (e.g. permission issues), checkpoints are
> tried to be cleaned up in the CheckpointsCleaner and left like that if
> that cleanup failed. A log message is printed. The user would be
> responsible for cleaning up the data. This was discussed as part of the
> release testing efforts for Flink 1.15 in FLINK-26388 [2].
>
> We could add repeatable cleanup in the CheckpointsCleaner. We would have
> to make sure that all StateObject#discardState implementations are
> idempotent. This is not necessarily the case right now (see FLINK-26606
> [3]).
>
> Additionally, there is the problem of losing information about what
> Checkpoints are subject to cleanup in case of JobManager failovers. These
> Checkpoints are not stored as part of the HA data. Additionally,
> PendingCheckpoints are not serialized in any way, either. None of these
> artifacts are picked up again after a failover. I see the following options
> here:
>
>    -
>
>    The purpose of CompletedCheckpointStore needs to be extended to become
>    a “general” CheckpointStore. It will store PendingCheckpoints and
>    CompletedCheckpoints that are marked for deletion. After a failover,
>    CheckpointsCleaner can pick up these instances again and continue with
>    the deletion process.
>
> The flaw of that approach is that we’re increasing the amount of data that
> is stored in the underlying StateHandleStore. Additionally, we’re going
> to have an increased number of accesses to the CompletedCheckpointStore.
> These accesses need to happen in the main thread; more specifically, adding
> PendingCheckpoints and marking Checkpoints for deletion.
>
>    -
>
>    We’re actually interested in cleaning up artifacts from the
>    FileSystem, i.e. the artifacts created by the StateHandleStore used
>    within the DefaultCompletedCheckpointStore containing the serialized
>    CompletedCheckpoint instance and the checkpoint’s folder containing
>    the actual operator states. We could adapt the CompletedCheckpointStore
>    in a way that any Checkpoint (including PendingCheckpoint) is
>    serialized and persisted on the FileSystem right away (which is currently
>    done within the StateHandleStore implementations when adding
>    CompletedCheckpoints to the underlying HA system). The corresponding
>    FileStateHandleObject (referring to that serialized CompletedCheckpoint)
>    that gets persisted to ZooKeeper/k8s ConfigMap in the end would be only
>    written if the CompletedCheckpoint is finalized and can be used. The
>    CheckpointsCleaner could recover any artifacts from the FileSystem and
>    cleanup anything that’s not listed in ZooKeeper/k8s ConfigMap.
>
> This approach saves us from accessing the HA backend (i.e. ZooKeeper/k8s)
> but would require additional IO on the FileSystem, still. It would require
> some larger refactoring, though: The RetrievableStateHandle currently being
> handled internally (i.e. in the StateHandleStore) need to become public.
>
>    -
>
>    We just add repeatable cleanup to the CheckpointsCleaner as is. No
>    cleanup is picked up again after recovery. This could be a fallback for the
>    user to reduce IO.
>
>
> I’m interested in initial opinions from the community on that matter
> before starting to work on a final FLIP.
>
> Best,
>
> Matthias
>
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-194%3A+Introduce+the+JobResultStore
>
> [2] https://issues.apache.org/jira/browse/FLINK-26388
>
> [3] https://issues.apache.org/jira/browse/FLINK-26606
>

Reply via email to