Hi, we should consider it very carefully if we should build something like stop-with-checkpoint at all. Semantically and conceptually, Checkpoint should be more and more internally managed by Flink[1] and users should use it very sparingly from the developmernt's perspective. Savepoint is the right one for users to do things like job migration or upgrade.
Best regards, Jing [1] https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/state/checkpoints_vs_savepoints/ On Mon, Oct 24, 2022 at 11:06 AM Piotr Nowojski <pnowoj...@apache.org> wrote: > Hi Saurabh and Yun Tang, > > I tend to agree with Yun Tang. Exposure of stop-with-checkpoint would > complicate the system for most of the users a bit too much, with very > little gain. > > > 1. In producing a full snapshot, I see this is noted as follows in the > Flip > > If you want to recover in CLAIM or NO_CLAIM mode, that's an orthogonal > question to whether you ended up with a savepoint or checkpoint at the end > of your last job. So that's not an issue here. As Yun Tang wrote, the only > difference is that stop-with-savepoint (vs a hypothetical > stop-with-checkpoint), has to copy out files from previous incremental > checkpoints that are still being referenced. I don't expect this to be a > big issue for the majority of use cases (only if you have a really huge > state, and the async phase of the checkpoint/savepoint is taking a > very long time - and only when assuming that you don't have another issue > that's causing slow file upload) . And if that's an issue, you still have > the other work-around for your initial problem: to manually trigger all of > the checkpoints as you wish via the REST API. > > Best, > Piotrek > > > > > > pon., 24 paź 2022 o 10:13 Yun Tang <myas...@live.com> napisał(a): > > > Hi Saurabh, > > > > From the scope of implementation, I think stopping with native savepoint > > is very close to stopping with checkpoint. The only different part is the > > fast duplication over distributed file systems, which could be mitigated > > via distributed file system shallow copy. Thus, I don't think we should > > introduce another concept called stopping with checkpoint to make the > total > > system more complex. > > > > > > Best > > Yun Tang > > ________________________________ > > From: Saurabh Kaul <saurk...@gmail.com> > > Sent: Saturday, October 22, 2022 2:45 > > To: dev@flink.apache.org <dev@flink.apache.org> > > Subject: Re: [DISCUSS] REST API to suspend & resume checkpointing > > > > Hi Piotr, > > > > Sorry for the confusion. I'm referring to latency concerns at 2 points - > > > > 1. In producing a full snapshot, I see this is noted as follows in the > Flip > > [1]: > > > This means that if the underlying FileSystem doesn’t support fast > > duplication, incremental savepoints will most likely be still slower > > compared to incremental checkpoints. > > In our use case for example, we use S3 file system so we have some > > additional overhead of duplication. > > 2. I mixed up the savepoint recovery modes, I think the default NO_CLAIM > > mode would be more appropriate for production use generally since CLAIM > > mode carries the risk of accidental savepoint deletion when future > > checkpoints can build on top of it. Manual checkpoint deletion is not a > > concern. NO_CLAIM would require the first successful checkpoint to be a > > full checkpoint. > > > > [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-203 > > > > On Fri, Oct 21, 2022 at 1:31 AM Piotr Nowojski <pnowoj...@apache.org> > > wrote: > > > > > Hi Saurabh, > > > > > > > 1. Stop with native savepoint does solve any races and produces a > > > > predictable restoration point, but producing a self-contained > snapshot > > > and > > > > using CLAIM mode in re-running is not necessary here and adds > latency. > > > > > > Why do you think it adds latency? CLAIM mode is actually the fastest > one > > > with zero additional overheads. Native savepoints are almost an > > equivalent > > > of checkpoints. > > > > > > Best, > > > Piotrek > > > > > > > > > pt., 21 paź 2022 o 00:51 Saurabh Kaul <saurk...@gmail.com> napisał(a): > > > > > > > Hi, thanks for the quick responses, > > > > > > > > I think a stop-with-checkpoint idea is overlapping well with the > > > > requirements. > > > > > > > > 1. Stop with native savepoint does solve any races and produces a > > > > predictable restoration point, but producing a self-contained > snapshot > > > and > > > > using CLAIM mode in re-running is not necessary here and adds > latency. > > > > Stop-with-checkpoint doesn't have these issues. It adds some downtime > > in > > > > waiting for a checkpoint to be completed but reduces replay time in > the > > > new > > > > cluster which is a good trade-off. Since in this scenario of job > > > migration > > > > the job and/or job configuration is not changing; it should ideally > be > > as > > > > fast as a regular failover scenario (like a TM going down). > > > > 2. Taking complete ownership of triggering checkpoints and making > them > > > more > > > > configurable could be feasible but are less effective comparatively > in > > > > terms of stopping the job for the primary purpose of low-downtime > > > migration > > > > of the job. Stop-with-checkpoint solves it more directly. > > > > > > > > Looking forward to hearing thoughts on this. > > > > > > > > On Thu, Oct 20, 2022 at 3:31 AM Piotr Nowojski <pnowoj...@apache.org > > > > > > wrote: > > > > > > > > > Hi Saurabh, > > > > > > > > > > Thanks for reaching out with the proposal. I have some mixed > feelings > > > > about > > > > > this for a couple of reasons: > > > > > > > > > > 1. It sounds like the core problem that you are describing is the > > race > > > > > condition between shutting down the cluster and completion of new > > > > > checkpoints. My first thought would be as Jing's, why don't you use > > > > > stop-with-savepoint? Especially the native savepoint? You can > recover > > > > from > > > > > it using --claim mode, so the whole process should be quite fast > > > > actually. > > > > > 2. The same issue, not knowing the latest completed checkpoint id, > > > > plagued > > > > > us with some internal tests for quite a bit, so maybe this would > also > > > be > > > > > worth considering to address instead? Like leaving in some text > file > > > the > > > > > last completed checkpoint id? Or providing a way to read this from > > some > > > > > existing metadata files? However in our tests we actually > > fixed/worked > > > > > around that with manually triggering of checkpoints. The > predecessor > > of > > > > > FLINK-27101 [1], FLINK-24280 [2], was implemented to address this > > exact > > > > > issue. Which brings me to... > > > > > 3. You could actually just use the REST API to trigger all > > checkpoints > > > > > manually. The idea behind FLINK-27101 [1] was to add full > flexibility > > > to > > > > > the users, without adding much complexity to the system. If we > start > > > > adding > > > > > more REST calls to control checkpointing behaviour it would > > complicate > > > > the > > > > > system. > > > > > 4. If at all, I would think more towards a more generic idea of > > > > dynamically > > > > > reconfiguring the system. We could provide a generic way to > > dynamically > > > > > change configuration options. We wouldn't be able to support all > > > > > configurations, and furthermore, each "dynamic" option would have > to > > be > > > > > handled/passed down to and through the system differently, BUT we > > > > wouldn't > > > > > have to do all of that at once. We could start with a very limited > > set > > > of > > > > > dynamic options, for example just with the checkpointing interval. > > This > > > > > must have been considered/discussed before, so I might be missing > > lots > > > of > > > > > things. > > > > > 5. Another direction, if 1. is not an option for some reason, is to > > > > provide > > > > > a stop-with-checkpoint feature? > > > > > > > > > > Best Piotrek > > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-27101 > > > > > [2] https://issues.apache.org/jira/browse/FLINK-24280 > > > > > > > > > > czw., 20 paź 2022 o 11:53 Jing Ge <j...@ververica.com> napisał(a): > > > > > > > > > > > Hi Saurabh, > > > > > > > > > > > > In general, it is always good to add new features. I am not > really > > > sure > > > > > if > > > > > > I understood your requirement. I guess it will be too long for > you > > to > > > > > > resume the job with a created savepoint in the new stand-by Flink > > > > > cluster. > > > > > > But if it would be acceptable to you, you should not have the > issue > > > you > > > > > > mentioned with the checkpoint. Speaking of checkpoint, if the > > > > checkpoint > > > > > > interval were set properly, it should be fine even if in some > rare > > > > cases > > > > > > the last checkpoint was partially completed and is not selected. > > > > Another > > > > > > option could be to trigger a manual checkpoint and then use that > > one > > > to > > > > > > resume the job to maintain the low downtime. > > > > > > > > > > > > Best regards, > > > > > > JIng > > > > > > > > > > > > On Thu, Oct 20, 2022 at 7:53 AM Saurabh Kaul <saurk...@gmail.com > > > > > > wrote: > > > > > > > > > > > > > Hey everyone, > > > > > > > > > > > > > > I will create a FLIP, but wanted to gauge community opinion > > first. > > > > The > > > > > > > motivation is that production Flink applications frequently > need > > to > > > > go > > > > > > > through node/image patching to update the software and AMI with > > > > latest > > > > > > > security fixes. These patching related restarts do not involve > > > > > > application > > > > > > > jar or parallelism updates and can therefore be done without > > costly > > > > > > > savepoint completion and restore cycles by relying on the last > > > > > checkpoint > > > > > > > state in order to achieve minimum downtime. In order to achieve > > > this, > > > > > we > > > > > > > currently rely on retained checkpoints and the following steps: > > > > > > > > > > > > > > - Create new stand-by Flink cluster and submit application > jar > > > > > > > - Delete Flink TM deployment to stop processing & > checkpoints > > on > > > > old > > > > > > > cluster(reduce duplicates) > > > > > > > - Query last completed checkpoint from REST API on JM of old > > > > cluster > > > > > > > - Submit new job using last available checkpoint in new > > cluster, > > > > > > delete > > > > > > > old cluster > > > > > > > > > > > > > > We have observed that this process will sometimes not select > the > > > > latest > > > > > > > checkpoint as partially completed checkpoints race and finish > > after > > > > > > > querying the JM. Alternatives are to rely on creating other > > sources > > > > for > > > > > > > checkpoint info but this has complications, as discussed in > [2]. > > > > > Waiting > > > > > > > and force deleting task managers increases downtimes and > doesn't > > > > > > guarantee > > > > > > > TM process termination respectively. In order to maintain low > > > > downtime, > > > > > > > duplicates and solve this race we can introduce an API to > suspend > > > > > > > checkpointing. Querying the latest available checkpoint after > > > having > > > > > > > suspending checkpointing will guarantee that we can maintain > > > exactly > > > > > once > > > > > > > in such a scenario. > > > > > > > > > > > > > > This also acts as an extension to [1] where the feature to > > trigger > > > > > > > checkpoints through a control plane has been discussed and > added. > > > It > > > > > > makes > > > > > > > the checkpointing process flexible and gives the user more > > control > > > in > > > > > > > scenarios like migrating applications and letting data > processing > > > > catch > > > > > > up > > > > > > > temporarily. > > > > > > > We can implement this similar to [1] and expose a trigger to > > > suspend > > > > > and > > > > > > > resume checkpointing via CLI and REST API. We can add a > parameter > > > to > > > > > > > suspend in 2 ways. > > > > > > > > > > > > > > 1. Suspend scheduled checkpoint trigger, doesn’t cancel any > > > still > > > > in > > > > > > > progress checkpoints/savepoints but stops only future ones > > > > > > > 2. Suspend checkpoint coordinator, cancels in progress > > > > > > > checkpoints/savepoints. Guarantees no racing checkpoint > > > completion > > > > > and > > > > > > > could be used for canceling stuck checkpoints and help data > > > > > processing > > > > > > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-27101 > > > > > > > [2] https://issues.apache.org/jira/browse/FLINK-26916 > > > > > > > > > > > > > > > > > > > > > > > > > > > >