[ 
https://issues.apache.org/jira/browse/FLINK-18263?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17139250#comment-17139250
 ] 

Mark Cho commented on FLINK-18263:
----------------------------------

{code:java}
Another case it would help is to move a running job if users do not want to 
take a savepoint but just want to reuse the periodical external checkpoints.
{code}
That is what we typically do and what we want to do for this case as well. 
However, in this case, no external checkpoints that we can use to redeploy due 
to "FINISHED" state causing the JM to delete all the external checkpoints.

By calling `isEndOfStream(...)` in KafkaDeserializationSchema, that puts the 
Kafka source into a "FINISHED" state. In this specific case, the job is put 
into "FINISHED" state as the current deployment (with current config) does not 
want process those records, so we have an external controller that will 
redeploy this job with some config changed that can process those records.

In this redeployment, we would like to deploy from the checkpoint so we resume 
from the offsets stored in the checkpoint and also restore other state that are 
in the checkpoint.

> Allow external checkpoints to be persisted even when the job is in "Finished" 
> state.
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-18263
>                 URL: https://issues.apache.org/jira/browse/FLINK-18263
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Checkpointing
>            Reporter: Mark Cho
>            Priority: Major
>              Labels: pull-request-available
>
> Currently, `execution.checkpointing.externalized-checkpoint-retention` 
> configuration supports two options:
> - `DELETE_ON_CANCELLATION` which keeps the externalized checkpoints in FAILED 
> and SUSPENDED state.
> - `RETAIN_ON_CANCELLATION` which keeps the externalized checkpoints in 
> FAILED, SUSPENDED, and CANCELED state.
> This gives us control over the retention of externalized checkpoints in all 
> terminal state of a job, except for the FINISHED state.
> If the job ends up in "FINISHED" state, externalized checkpoints will be 
> automatically cleaned up and there currently is no config that will ensure 
> that these externalized checkpoints to be persisted.
> I found an old Jira ticket FLINK-4512 where this was discussed. I think it 
> would be helpful to have a config that can control the retention policy for 
> FINISHED state as well.
> - This can be useful for cases where we want to rewind a job (that reached 
> the FINISHED state) to a previous checkpoint.
> - When we use externalized checkpoints, we want to fully delegate the 
> checkpoint clean-up to an external process in all job states (without 
> cherrypicking FINISHED state to be cleaned up by Flink).
> We have a quick fix working in our fork where we've changed 
> `ExternalizedCheckpointCleanup` enum:
> {code:java}
> RETAIN_ON_FAILURE (renamed from DELETE_ON_CANCELLATION; retains on FAILED)
> RETAIN_ON_CANCELLATION (kept the same; retains on FAILED, CANCELED)
> RETAIN_ON_SUCCESS (added; retains on FAILED, CANCELED, FINISHED)
> {code}
> Since this change requires changes to multiple components (e.g. config 
> values, REST API, Web UI, etc), I wanted to get the community's thoughts 
> before I invest more time in my quick fix PR (which currently only contains 
> minimal change to get this working).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to