Hi Saurabh,

Thanks for reaching out with the proposal. I have some mixed feelings about
this for a couple of reasons:

1. It sounds like the core problem that you are describing is the race
condition between shutting down the cluster and completion of new
checkpoints. My first thought would be as Jing's, why don't you use
stop-with-savepoint? Especially the native savepoint? You can recover from
it using --claim mode, so the whole process should be quite fast actually.
2. The same issue, not knowing the latest completed checkpoint id, plagued
us with some internal tests for quite a bit, so maybe this would also be
worth considering to address instead? Like leaving in some text file the
last completed checkpoint id? Or providing a way to read this from some
existing metadata files? However in our tests we actually fixed/worked
around that with manually triggering of checkpoints. The predecessor of
FLINK-27101 [1], FLINK-24280 [2], was implemented to address this exact
issue.  Which brings me to...
3. You could actually just use the REST API to trigger all checkpoints
manually. The idea behind FLINK-27101 [1] was to add full flexibility to
the users, without adding much complexity to the system. If we start adding
more REST calls to control checkpointing behaviour it would complicate the
system.
4. If at all, I would think more towards a more generic idea of dynamically
reconfiguring the system. We could provide a generic way to dynamically
change configuration options. We wouldn't be able to support all
configurations, and furthermore, each "dynamic" option would have to be
handled/passed down to and through the system differently, BUT we wouldn't
have to do all of that at once. We could start with a very limited set of
dynamic options, for example just with the checkpointing interval. This
must have been considered/discussed before, so I might be missing lots of
things.
5. Another direction, if 1. is not an option for some reason, is to provide
a stop-with-checkpoint feature?

Best Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-27101
[2] https://issues.apache.org/jira/browse/FLINK-24280

czw., 20 paź 2022 o 11:53 Jing Ge <j...@ververica.com> napisał(a):

> Hi Saurabh,
>
> In general, it is always good to add new features. I am not really sure if
> I understood your requirement. I guess it will be too long for you to
> resume the job with a created savepoint in the new stand-by Flink cluster.
> But if it would be acceptable to you, you should not have the issue you
> mentioned with the checkpoint. Speaking of checkpoint, if the checkpoint
> interval were set properly, it should be fine even if in some rare cases
> the last checkpoint was partially completed and is not selected. Another
> option could be to trigger a manual checkpoint and then use that one to
> resume the job to maintain the low downtime.
>
> Best regards,
> JIng
>
> On Thu, Oct 20, 2022 at 7:53 AM Saurabh Kaul <saurk...@gmail.com> wrote:
>
> > Hey everyone,
> >
> > I will create a FLIP, but wanted to gauge community opinion first. The
> > motivation is that production Flink applications frequently need to go
> > through node/image patching to update the software and AMI with latest
> > security fixes. These patching related restarts do not involve
> application
> > jar or parallelism updates and can therefore be done without costly
> > savepoint completion and restore cycles by relying on the last checkpoint
> > state in order to achieve minimum downtime. In order to achieve this, we
> > currently rely on retained checkpoints and the following steps:
> >
> >    - Create new stand-by Flink cluster and submit application jar
> >    - Delete Flink TM deployment to stop processing & checkpoints on old
> >    cluster(reduce duplicates)
> >    - Query last completed checkpoint from REST API on JM of old cluster
> >    - Submit new job using last available checkpoint in new cluster,
> delete
> >    old cluster
> >
> > We have observed that this process will sometimes not select the latest
> > checkpoint as partially completed checkpoints race and finish after
> > querying the JM. Alternatives are to rely on creating other sources for
> > checkpoint info but this has complications, as discussed in [2]. Waiting
> > and force deleting task managers increases downtimes and doesn't
> guarantee
> > TM process termination respectively. In order to maintain low downtime,
> > duplicates and solve this race we can introduce an API to suspend
> > checkpointing. Querying the latest available checkpoint after having
> > suspending checkpointing will guarantee that we can maintain exactly once
> > in such a scenario.
> >
> > This also acts as an extension to [1] where the feature to trigger
> > checkpoints through a control plane has been discussed and added. It
> makes
> > the checkpointing process flexible and gives the user more control in
> > scenarios like migrating applications and letting data processing catch
> up
> > temporarily.
> > We can implement this similar to [1] and expose a trigger to suspend and
> > resume checkpointing via CLI and REST API. We can add a parameter to
> > suspend in 2 ways.
> >
> >    1. Suspend scheduled checkpoint trigger, doesn’t cancel any still in
> >    progress checkpoints/savepoints but stops only future ones
> >    2. Suspend checkpoint coordinator, cancels in progress
> >    checkpoints/savepoints. Guarantees no racing checkpoint completion and
> >    could be used for canceling stuck checkpoints and help data processing
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-27101
> > [2] https://issues.apache.org/jira/browse/FLINK-26916
> >
>

Reply via email to