Re: [DISCUSS] Add config to enable job stop with savepoint on exceeding tolerable checkpoint Failures
Hi dongwoo, Thanks for the proposal. I don't think this is a good idea, especially since there's no guarantee that if a checkpoint has failed, that a savepoint will work. I also don't think it's necessarily a good idea that Flink needs to change because of an external factor: isn't there another mechanism that you would normally use to determine that your storage layer has an issue? Best regards, Martijn On Wed, Sep 6, 2023 at 3:40 PM Dongwoo Kim wrote: > Hi Yanfei, Hangxiang. > > Thank you for taking the time to review my suggestions. > I agree with Hangxiang that simply triggering a savepoint based on repeated > checkpoint failures isn't a reasonable approach. > Adding a specific condition, like *CHECKPOINT_ASYNC_EXCEPTION* as the > reason for the last checkpoint failure, could make it more practical, > although it's not a perfect solution. > Regarding restart policy my initial thought was to stop the job after > creating a savepoint. > I was open to further discussions about refining the restart policy, > especially if the community was interested in the idea of a primary/standby > checkpoint storage setup. > However, Hangxiang's suggestion to utilize Flink's REST API hadn't crossed > my mind, and it seems to address my needs well. > > I'll try leveraging the REST API to implement a failover strategy of > checkpoint storage failure. > Thank you again for your insights, they've been extremely helpful. > > Best Dongwoo, > > 2023년 9월 6일 (수) 오후 4:57, Hangxiang Yu 님이 작성: > > > Hi, Dongwoo. > > IIUC, you mean using savepoint to store a snapshot to other storage if > > checkpoints fail multiple times due to some long lasting exceptions of > > external storage, right ? > > I think it's better to achieve this by an external tool instead of > > introducing a config like that: > > 1. it's not so easy to judge whether an exception occurs due to external > > storage or not sometimes, and it's not so reasonable that we just > trigger a > > savepoint if checkpoints fail multiple times. > > 2. It's better to let some logic about triggering savepoint, e.g. > periodic > > savepoint, triggering stop-with-savepoint, done by external tools or > > platform. As you could see from [1], we intend to make their scopes > clear. > > > > Maybe you could check the status and failure message by [2] periodically > in > > your external tool or platform and then trigger savepoint or > > stop-with-savepoint by REST API or CLI. > > > > [1] > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/state/checkpoints_vs_savepoints/ > > [2] > > > > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/rest_api/#jobs-jobid-checkpoints > > > > On Wed, Sep 6, 2023 at 11:05 AM Yanfei Lei wrote: > > > > > Hi Dongwoo, > > > > > > If the checkpoint has failed > > > `execution.checkpointing.tolerable-failed-checkpoints` times, then > > > stopWithSavepoint is likely to fail as well. > > > If stopWithSavepoint succeeds or fails, will the job just stop? I am > > > more curious about how this option works with the restart strategy? > > > > > > Best, > > > Yanfei > > > > > > > > > Dongwoo Kim 于2023年9月4日周一 22:17写道: > > > > > > > > Hi all, > > > > I have a proposal that aims to enhance the flink application's > > > resilience in cases of unexpected failures in checkpoint storages like > S3 > > > or HDFS, > > > > > > > > [Background] > > > > When using self managed S3-compatible object storage, we faced > > > checkpoint async failures lasting for an extended period more than 30 > > > minutes, > > > > leading to multiple job restarts and causing lags in our streaming > > > application. > > > > > > > > [Current Behavior] > > > > Currently, when the number of checkpoint failures exceeds a > predefined > > > tolerable limit, flink will either restart or fail the job based on how > > > it's configured. > > > > In my opinion this does not handle scenarios where the checkpoint > > > storage itself may be unreliable or experiencing downtime. > > > > > > > > [Proposed Feature] > > > > I propose a config that allows for a graceful job stop with a > savepoint > > > when the tolerable checkpoint failure limit is reached. > > > > Instead of restarting/failing the job when tolerable checkpoint > failure > > > exceeds, when this new config is set to true just trigger > > stopWithSavepoint. > > > > > > > > This could offer the following benefits. > > > > - Indication of Checkpoint Storage State: Exceeding tolerable > > checkpoint > > > failures could indicate unstable checkpoint storage. > > > > - Automated Fallback Strategy: When combined with a monitoring cron > > job, > > > this feature could act as an automated fallback strategy for handling > > > unstable checkpoint storage. > > > > The job would stop safely, take a savepoint, and then you could > > > automatically restart with different checkpoint storage configured like > > > switching from S3 to HDFS. > > > > > > > > For example let's say checkpoint path is configured
Re: [DISCUSS] Add config to enable job stop with savepoint on exceeding tolerable checkpoint Failures
Hi Yanfei, Hangxiang. Thank you for taking the time to review my suggestions. I agree with Hangxiang that simply triggering a savepoint based on repeated checkpoint failures isn't a reasonable approach. Adding a specific condition, like *CHECKPOINT_ASYNC_EXCEPTION* as the reason for the last checkpoint failure, could make it more practical, although it's not a perfect solution. Regarding restart policy my initial thought was to stop the job after creating a savepoint. I was open to further discussions about refining the restart policy, especially if the community was interested in the idea of a primary/standby checkpoint storage setup. However, Hangxiang's suggestion to utilize Flink's REST API hadn't crossed my mind, and it seems to address my needs well. I'll try leveraging the REST API to implement a failover strategy of checkpoint storage failure. Thank you again for your insights, they've been extremely helpful. Best Dongwoo, 2023년 9월 6일 (수) 오후 4:57, Hangxiang Yu 님이 작성: > Hi, Dongwoo. > IIUC, you mean using savepoint to store a snapshot to other storage if > checkpoints fail multiple times due to some long lasting exceptions of > external storage, right ? > I think it's better to achieve this by an external tool instead of > introducing a config like that: > 1. it's not so easy to judge whether an exception occurs due to external > storage or not sometimes, and it's not so reasonable that we just trigger a > savepoint if checkpoints fail multiple times. > 2. It's better to let some logic about triggering savepoint, e.g. periodic > savepoint, triggering stop-with-savepoint, done by external tools or > platform. As you could see from [1], we intend to make their scopes clear. > > Maybe you could check the status and failure message by [2] periodically in > your external tool or platform and then trigger savepoint or > stop-with-savepoint by REST API or CLI. > > [1] > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/state/checkpoints_vs_savepoints/ > [2] > > https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/rest_api/#jobs-jobid-checkpoints > > On Wed, Sep 6, 2023 at 11:05 AM Yanfei Lei wrote: > > > Hi Dongwoo, > > > > If the checkpoint has failed > > `execution.checkpointing.tolerable-failed-checkpoints` times, then > > stopWithSavepoint is likely to fail as well. > > If stopWithSavepoint succeeds or fails, will the job just stop? I am > > more curious about how this option works with the restart strategy? > > > > Best, > > Yanfei > > > > > > Dongwoo Kim 于2023年9月4日周一 22:17写道: > > > > > > Hi all, > > > I have a proposal that aims to enhance the flink application's > > resilience in cases of unexpected failures in checkpoint storages like S3 > > or HDFS, > > > > > > [Background] > > > When using self managed S3-compatible object storage, we faced > > checkpoint async failures lasting for an extended period more than 30 > > minutes, > > > leading to multiple job restarts and causing lags in our streaming > > application. > > > > > > [Current Behavior] > > > Currently, when the number of checkpoint failures exceeds a predefined > > tolerable limit, flink will either restart or fail the job based on how > > it's configured. > > > In my opinion this does not handle scenarios where the checkpoint > > storage itself may be unreliable or experiencing downtime. > > > > > > [Proposed Feature] > > > I propose a config that allows for a graceful job stop with a savepoint > > when the tolerable checkpoint failure limit is reached. > > > Instead of restarting/failing the job when tolerable checkpoint failure > > exceeds, when this new config is set to true just trigger > stopWithSavepoint. > > > > > > This could offer the following benefits. > > > - Indication of Checkpoint Storage State: Exceeding tolerable > checkpoint > > failures could indicate unstable checkpoint storage. > > > - Automated Fallback Strategy: When combined with a monitoring cron > job, > > this feature could act as an automated fallback strategy for handling > > unstable checkpoint storage. > > > The job would stop safely, take a savepoint, and then you could > > automatically restart with different checkpoint storage configured like > > switching from S3 to HDFS. > > > > > > For example let's say checkpoint path is configured to s3 and savepoint > > path is configured to hdfs. > > > When the new config is set to true the job stops with savepoint like > > below when tolerable checkpoint failure exceeds. > > > And we can restart the job from that savepoint while the checkpoint > > configured as hdfs. > > > > > > > > > > > > Looking forward to hearing the community's thoughts on this proposal. > > > And also want to ask how the community is handling long lasting > unstable > > checkpoint storage issues. > > > > > > Thanks in advance. > > > > > > Best dongwoo, > > > > > -- > Best, > Hangxiang. >
Re: [DISCUSS] Add config to enable job stop with savepoint on exceeding tolerable checkpoint Failures
Hi, Dongwoo. IIUC, you mean using savepoint to store a snapshot to other storage if checkpoints fail multiple times due to some long lasting exceptions of external storage, right ? I think it's better to achieve this by an external tool instead of introducing a config like that: 1. it's not so easy to judge whether an exception occurs due to external storage or not sometimes, and it's not so reasonable that we just trigger a savepoint if checkpoints fail multiple times. 2. It's better to let some logic about triggering savepoint, e.g. periodic savepoint, triggering stop-with-savepoint, done by external tools or platform. As you could see from [1], we intend to make their scopes clear. Maybe you could check the status and failure message by [2] periodically in your external tool or platform and then trigger savepoint or stop-with-savepoint by REST API or CLI. [1] https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/state/checkpoints_vs_savepoints/ [2] https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/rest_api/#jobs-jobid-checkpoints On Wed, Sep 6, 2023 at 11:05 AM Yanfei Lei wrote: > Hi Dongwoo, > > If the checkpoint has failed > `execution.checkpointing.tolerable-failed-checkpoints` times, then > stopWithSavepoint is likely to fail as well. > If stopWithSavepoint succeeds or fails, will the job just stop? I am > more curious about how this option works with the restart strategy? > > Best, > Yanfei > > > Dongwoo Kim 于2023年9月4日周一 22:17写道: > > > > Hi all, > > I have a proposal that aims to enhance the flink application's > resilience in cases of unexpected failures in checkpoint storages like S3 > or HDFS, > > > > [Background] > > When using self managed S3-compatible object storage, we faced > checkpoint async failures lasting for an extended period more than 30 > minutes, > > leading to multiple job restarts and causing lags in our streaming > application. > > > > [Current Behavior] > > Currently, when the number of checkpoint failures exceeds a predefined > tolerable limit, flink will either restart or fail the job based on how > it's configured. > > In my opinion this does not handle scenarios where the checkpoint > storage itself may be unreliable or experiencing downtime. > > > > [Proposed Feature] > > I propose a config that allows for a graceful job stop with a savepoint > when the tolerable checkpoint failure limit is reached. > > Instead of restarting/failing the job when tolerable checkpoint failure > exceeds, when this new config is set to true just trigger stopWithSavepoint. > > > > This could offer the following benefits. > > - Indication of Checkpoint Storage State: Exceeding tolerable checkpoint > failures could indicate unstable checkpoint storage. > > - Automated Fallback Strategy: When combined with a monitoring cron job, > this feature could act as an automated fallback strategy for handling > unstable checkpoint storage. > > The job would stop safely, take a savepoint, and then you could > automatically restart with different checkpoint storage configured like > switching from S3 to HDFS. > > > > For example let's say checkpoint path is configured to s3 and savepoint > path is configured to hdfs. > > When the new config is set to true the job stops with savepoint like > below when tolerable checkpoint failure exceeds. > > And we can restart the job from that savepoint while the checkpoint > configured as hdfs. > > > > > > > > Looking forward to hearing the community's thoughts on this proposal. > > And also want to ask how the community is handling long lasting unstable > checkpoint storage issues. > > > > Thanks in advance. > > > > Best dongwoo, > -- Best, Hangxiang.
Re: [DISCUSS] Add config to enable job stop with savepoint on exceeding tolerable checkpoint Failures
Hi Dongwoo, If the checkpoint has failed `execution.checkpointing.tolerable-failed-checkpoints` times, then stopWithSavepoint is likely to fail as well. If stopWithSavepoint succeeds or fails, will the job just stop? I am more curious about how this option works with the restart strategy? Best, Yanfei Dongwoo Kim 于2023年9月4日周一 22:17写道: > > Hi all, > I have a proposal that aims to enhance the flink application's resilience in > cases of unexpected failures in checkpoint storages like S3 or HDFS, > > [Background] > When using self managed S3-compatible object storage, we faced checkpoint > async failures lasting for an extended period more than 30 minutes, > leading to multiple job restarts and causing lags in our streaming > application. > > [Current Behavior] > Currently, when the number of checkpoint failures exceeds a predefined > tolerable limit, flink will either restart or fail the job based on how it's > configured. > In my opinion this does not handle scenarios where the checkpoint storage > itself may be unreliable or experiencing downtime. > > [Proposed Feature] > I propose a config that allows for a graceful job stop with a savepoint when > the tolerable checkpoint failure limit is reached. > Instead of restarting/failing the job when tolerable checkpoint failure > exceeds, when this new config is set to true just trigger stopWithSavepoint. > > This could offer the following benefits. > - Indication of Checkpoint Storage State: Exceeding tolerable checkpoint > failures could indicate unstable checkpoint storage. > - Automated Fallback Strategy: When combined with a monitoring cron job, this > feature could act as an automated fallback strategy for handling unstable > checkpoint storage. > The job would stop safely, take a savepoint, and then you could > automatically restart with different checkpoint storage configured like > switching from S3 to HDFS. > > For example let's say checkpoint path is configured to s3 and savepoint path > is configured to hdfs. > When the new config is set to true the job stops with savepoint like below > when tolerable checkpoint failure exceeds. > And we can restart the job from that savepoint while the checkpoint > configured as hdfs. > > > > Looking forward to hearing the community's thoughts on this proposal. > And also want to ask how the community is handling long lasting unstable > checkpoint storage issues. > > Thanks in advance. > > Best dongwoo,
[DISCUSS] Add config to enable job stop with savepoint on exceeding tolerable checkpoint Failures
Hi all, I have a proposal that aims to enhance the flink application's resilience in cases of unexpected failures in checkpoint storages like S3 or HDFS, *[Background]* When using self managed S3-compatible object storage, we faced checkpoint async failures lasting for an extended period more than 30 minutes, leading to multiple job restarts and causing lags in our streaming application. *[Current Behavior]*Currently, when the number of checkpoint failures exceeds a predefined tolerable limit, flink will either restart or fail the job based on how it's configured. In my opinion this does not handle scenarios where the checkpoint storage itself may be unreliable or experiencing downtime. *[Proposed Feature]* I propose a config that allows for a graceful job stop with a savepoint when the tolerable checkpoint failure limit is reached. Instead of restarting/failing the job when tolerable checkpoint failure exceeds, when this new config is set to true just trigger stopWithSavepoint. This could offer the following benefits. - Indication of Checkpoint Storage State: Exceeding tolerable checkpoint failures could indicate unstable checkpoint storage. - Automated Fallback Strategy: When combined with a monitoring cron job, this feature could act as an automated fallback strategy for handling unstable checkpoint storage. The job would stop safely, take a savepoint, and then you could automatically restart with different checkpoint storage configured like switching from S3 to HDFS. For example let's say checkpoint path is configured to s3 and savepoint path is configured to hdfs. When the new config is set to true the job stops with savepoint like below when tolerable checkpoint failure exceeds. And we can restart the job from that savepoint while the checkpoint configured as hdfs. [image: image.png] Looking forward to hearing the community's thoughts on this proposal. And also want to ask how the community is handling long lasting unstable checkpoint storage issues. Thanks in advance. Best dongwoo,