Re: [DISCUSS] Add config to enable job stop with savepoint on exceeding tolerable checkpoint Failures

2023-09-06 Thread Martijn Visser
Hi dongwoo,

Thanks for the proposal. I don't think this is a good idea, especially
since there's no guarantee that if a checkpoint has failed, that a
savepoint will work. I also don't think it's necessarily a good idea that
Flink needs to change because of an external factor: isn't there another
mechanism that you would normally use to determine that your storage layer
has an issue?

Best regards,

Martijn

On Wed, Sep 6, 2023 at 3:40 PM Dongwoo Kim  wrote:

> Hi Yanfei, Hangxiang.
>
> Thank you for taking the time to review my suggestions.
> I agree with Hangxiang that simply triggering a savepoint based on repeated
> checkpoint failures isn't a reasonable approach.
> Adding a specific condition, like *CHECKPOINT_ASYNC_EXCEPTION* as the
> reason for the last checkpoint failure, could make it more practical,
> although it's not a perfect solution.
> Regarding restart policy my initial thought was to stop the job after
> creating a savepoint.
> I was open to further discussions about refining the restart policy,
> especially if the community was interested in the idea of a primary/standby
> checkpoint storage setup.
> However, Hangxiang's suggestion to utilize Flink's REST API hadn't crossed
> my mind, and it seems to address my needs well.
>
> I'll try leveraging the REST API to implement a failover strategy of
> checkpoint storage failure.
> Thank you again for your insights, they've been extremely helpful.
>
> Best Dongwoo,
>
> 2023년 9월 6일 (수) 오후 4:57, Hangxiang Yu 님이 작성:
>
> > Hi, Dongwoo.
> > IIUC, you mean using savepoint to store a snapshot to other storage if
> > checkpoints fail multiple times due to some long lasting exceptions of
> > external storage, right ?
> > I think it's better to achieve this by an external tool instead of
> > introducing a config like that:
> > 1. it's not so easy to judge whether an exception occurs due to external
> > storage or not sometimes, and it's not so reasonable that we just
> trigger a
> > savepoint if checkpoints fail multiple times.
> > 2. It's better to let some logic about triggering savepoint, e.g.
> periodic
> > savepoint, triggering stop-with-savepoint, done by external tools or
> > platform. As you could see from [1], we intend to make their scopes
> clear.
> >
> > Maybe you could check the status and failure message by [2] periodically
> in
> > your external tool or platform and then trigger savepoint or
> > stop-with-savepoint by REST API or CLI.
> >
> > [1]
> >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/state/checkpoints_vs_savepoints/
> > [2]
> >
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/rest_api/#jobs-jobid-checkpoints
> >
> > On Wed, Sep 6, 2023 at 11:05 AM Yanfei Lei  wrote:
> >
> > > Hi Dongwoo,
> > >
> > > If the checkpoint has failed
> > > `execution.checkpointing.tolerable-failed-checkpoints` times, then
> > > stopWithSavepoint is likely to fail as well.
> > > If stopWithSavepoint succeeds or fails, will the job just stop?  I am
> > > more curious about how this option works with the restart strategy?
> > >
> > > Best,
> > > Yanfei
> > >
> > >
> > > Dongwoo Kim  于2023年9月4日周一 22:17写道:
> > > >
> > > > Hi all,
> > > > I have a proposal that aims to enhance the flink application's
> > > resilience in cases of unexpected failures in checkpoint storages like
> S3
> > > or HDFS,
> > > >
> > > > [Background]
> > > > When using self managed S3-compatible object storage, we faced
> > > checkpoint async failures lasting for an extended period more than 30
> > > minutes,
> > > > leading to multiple job restarts and causing lags in our streaming
> > > application.
> > > >
> > > > [Current Behavior]
> > > > Currently, when the number of checkpoint failures exceeds a
> predefined
> > > tolerable limit, flink will either restart or fail the job based on how
> > > it's configured.
> > > > In my opinion this does not handle scenarios where the checkpoint
> > > storage itself may be unreliable or experiencing downtime.
> > > >
> > > > [Proposed Feature]
> > > > I propose a config that allows for a graceful job stop with a
> savepoint
> > > when the tolerable checkpoint failure limit is reached.
> > > > Instead of restarting/failing the job when tolerable checkpoint
> failure
> > > exceeds, when this new config is set to true just trigger
> > stopWithSavepoint.
> > > >
> > > > This could offer the following benefits.
> > > > - Indication of Checkpoint Storage State: Exceeding tolerable
> > checkpoint
> > > failures could indicate unstable checkpoint storage.
> > > > - Automated Fallback Strategy: When combined with a monitoring cron
> > job,
> > > this feature could act as an automated fallback strategy for handling
> > > unstable checkpoint storage.
> > > >   The job would stop safely, take a savepoint, and then you could
> > > automatically restart with different checkpoint storage configured like
> > > switching from S3 to HDFS.
> > > >
> > > > For example let's say checkpoint path is configured 

Re: [DISCUSS] Add config to enable job stop with savepoint on exceeding tolerable checkpoint Failures

2023-09-06 Thread Dongwoo Kim
Hi Yanfei, Hangxiang.

Thank you for taking the time to review my suggestions.
I agree with Hangxiang that simply triggering a savepoint based on repeated
checkpoint failures isn't a reasonable approach.
Adding a specific condition, like *CHECKPOINT_ASYNC_EXCEPTION* as the
reason for the last checkpoint failure, could make it more practical,
although it's not a perfect solution.
Regarding restart policy my initial thought was to stop the job after
creating a savepoint.
I was open to further discussions about refining the restart policy,
especially if the community was interested in the idea of a primary/standby
checkpoint storage setup.
However, Hangxiang's suggestion to utilize Flink's REST API hadn't crossed
my mind, and it seems to address my needs well.

I'll try leveraging the REST API to implement a failover strategy of
checkpoint storage failure.
Thank you again for your insights, they've been extremely helpful.

Best Dongwoo,

2023년 9월 6일 (수) 오후 4:57, Hangxiang Yu 님이 작성:

> Hi, Dongwoo.
> IIUC, you mean using savepoint to store a snapshot to other storage if
> checkpoints fail multiple times due to some long lasting exceptions of
> external storage, right ?
> I think it's better to achieve this by an external tool instead of
> introducing a config like that:
> 1. it's not so easy to judge whether an exception occurs due to external
> storage or not sometimes, and it's not so reasonable that we just trigger a
> savepoint if checkpoints fail multiple times.
> 2. It's better to let some logic about triggering savepoint, e.g. periodic
> savepoint, triggering stop-with-savepoint, done by external tools or
> platform. As you could see from [1], we intend to make their scopes clear.
>
> Maybe you could check the status and failure message by [2] periodically in
> your external tool or platform and then trigger savepoint or
> stop-with-savepoint by REST API or CLI.
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/state/checkpoints_vs_savepoints/
> [2]
>
> https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/rest_api/#jobs-jobid-checkpoints
>
> On Wed, Sep 6, 2023 at 11:05 AM Yanfei Lei  wrote:
>
> > Hi Dongwoo,
> >
> > If the checkpoint has failed
> > `execution.checkpointing.tolerable-failed-checkpoints` times, then
> > stopWithSavepoint is likely to fail as well.
> > If stopWithSavepoint succeeds or fails, will the job just stop?  I am
> > more curious about how this option works with the restart strategy?
> >
> > Best,
> > Yanfei
> >
> >
> > Dongwoo Kim  于2023年9月4日周一 22:17写道:
> > >
> > > Hi all,
> > > I have a proposal that aims to enhance the flink application's
> > resilience in cases of unexpected failures in checkpoint storages like S3
> > or HDFS,
> > >
> > > [Background]
> > > When using self managed S3-compatible object storage, we faced
> > checkpoint async failures lasting for an extended period more than 30
> > minutes,
> > > leading to multiple job restarts and causing lags in our streaming
> > application.
> > >
> > > [Current Behavior]
> > > Currently, when the number of checkpoint failures exceeds a predefined
> > tolerable limit, flink will either restart or fail the job based on how
> > it's configured.
> > > In my opinion this does not handle scenarios where the checkpoint
> > storage itself may be unreliable or experiencing downtime.
> > >
> > > [Proposed Feature]
> > > I propose a config that allows for a graceful job stop with a savepoint
> > when the tolerable checkpoint failure limit is reached.
> > > Instead of restarting/failing the job when tolerable checkpoint failure
> > exceeds, when this new config is set to true just trigger
> stopWithSavepoint.
> > >
> > > This could offer the following benefits.
> > > - Indication of Checkpoint Storage State: Exceeding tolerable
> checkpoint
> > failures could indicate unstable checkpoint storage.
> > > - Automated Fallback Strategy: When combined with a monitoring cron
> job,
> > this feature could act as an automated fallback strategy for handling
> > unstable checkpoint storage.
> > >   The job would stop safely, take a savepoint, and then you could
> > automatically restart with different checkpoint storage configured like
> > switching from S3 to HDFS.
> > >
> > > For example let's say checkpoint path is configured to s3 and savepoint
> > path is configured to hdfs.
> > > When the new config is set to true the job stops with savepoint like
> > below when tolerable checkpoint failure exceeds.
> > > And we can restart the job from that savepoint while the checkpoint
> > configured as hdfs.
> > >
> > >
> > >
> > > Looking forward to hearing the community's thoughts on this proposal.
> > > And also want to ask how the community is handling long lasting
> unstable
> > checkpoint storage issues.
> > >
> > > Thanks in advance.
> > >
> > > Best dongwoo,
> >
>
>
> --
> Best,
> Hangxiang.
>


Re: [DISCUSS] Add config to enable job stop with savepoint on exceeding tolerable checkpoint Failures

2023-09-06 Thread Hangxiang Yu
Hi, Dongwoo.
IIUC, you mean using savepoint to store a snapshot to other storage if
checkpoints fail multiple times due to some long lasting exceptions of
external storage, right ?
I think it's better to achieve this by an external tool instead of
introducing a config like that:
1. it's not so easy to judge whether an exception occurs due to external
storage or not sometimes, and it's not so reasonable that we just trigger a
savepoint if checkpoints fail multiple times.
2. It's better to let some logic about triggering savepoint, e.g. periodic
savepoint, triggering stop-with-savepoint, done by external tools or
platform. As you could see from [1], we intend to make their scopes clear.

Maybe you could check the status and failure message by [2] periodically in
your external tool or platform and then trigger savepoint or
stop-with-savepoint by REST API or CLI.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/state/checkpoints_vs_savepoints/
[2]
https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/ops/rest_api/#jobs-jobid-checkpoints

On Wed, Sep 6, 2023 at 11:05 AM Yanfei Lei  wrote:

> Hi Dongwoo,
>
> If the checkpoint has failed
> `execution.checkpointing.tolerable-failed-checkpoints` times, then
> stopWithSavepoint is likely to fail as well.
> If stopWithSavepoint succeeds or fails, will the job just stop?  I am
> more curious about how this option works with the restart strategy?
>
> Best,
> Yanfei
>
>
> Dongwoo Kim  于2023年9月4日周一 22:17写道:
> >
> > Hi all,
> > I have a proposal that aims to enhance the flink application's
> resilience in cases of unexpected failures in checkpoint storages like S3
> or HDFS,
> >
> > [Background]
> > When using self managed S3-compatible object storage, we faced
> checkpoint async failures lasting for an extended period more than 30
> minutes,
> > leading to multiple job restarts and causing lags in our streaming
> application.
> >
> > [Current Behavior]
> > Currently, when the number of checkpoint failures exceeds a predefined
> tolerable limit, flink will either restart or fail the job based on how
> it's configured.
> > In my opinion this does not handle scenarios where the checkpoint
> storage itself may be unreliable or experiencing downtime.
> >
> > [Proposed Feature]
> > I propose a config that allows for a graceful job stop with a savepoint
> when the tolerable checkpoint failure limit is reached.
> > Instead of restarting/failing the job when tolerable checkpoint failure
> exceeds, when this new config is set to true just trigger stopWithSavepoint.
> >
> > This could offer the following benefits.
> > - Indication of Checkpoint Storage State: Exceeding tolerable checkpoint
> failures could indicate unstable checkpoint storage.
> > - Automated Fallback Strategy: When combined with a monitoring cron job,
> this feature could act as an automated fallback strategy for handling
> unstable checkpoint storage.
> >   The job would stop safely, take a savepoint, and then you could
> automatically restart with different checkpoint storage configured like
> switching from S3 to HDFS.
> >
> > For example let's say checkpoint path is configured to s3 and savepoint
> path is configured to hdfs.
> > When the new config is set to true the job stops with savepoint like
> below when tolerable checkpoint failure exceeds.
> > And we can restart the job from that savepoint while the checkpoint
> configured as hdfs.
> >
> >
> >
> > Looking forward to hearing the community's thoughts on this proposal.
> > And also want to ask how the community is handling long lasting unstable
> checkpoint storage issues.
> >
> > Thanks in advance.
> >
> > Best dongwoo,
>


-- 
Best,
Hangxiang.


Re: [DISCUSS] Add config to enable job stop with savepoint on exceeding tolerable checkpoint Failures

2023-09-05 Thread Yanfei Lei
Hi Dongwoo,

If the checkpoint has failed
`execution.checkpointing.tolerable-failed-checkpoints` times, then
stopWithSavepoint is likely to fail as well.
If stopWithSavepoint succeeds or fails, will the job just stop?  I am
more curious about how this option works with the restart strategy?

Best,
Yanfei


Dongwoo Kim  于2023年9月4日周一 22:17写道:
>
> Hi all,
> I have a proposal that aims to enhance the flink application's resilience in 
> cases of unexpected failures in checkpoint storages like S3 or HDFS,
>
> [Background]
> When using self managed S3-compatible object storage, we faced checkpoint 
> async failures lasting for an extended period more than 30 minutes,
> leading to multiple job restarts and causing lags in our streaming 
> application.
>
> [Current Behavior]
> Currently, when the number of checkpoint failures exceeds a predefined 
> tolerable limit, flink will either restart or fail the job based on how it's 
> configured.
> In my opinion this does not handle scenarios where the checkpoint storage 
> itself may be unreliable or experiencing downtime.
>
> [Proposed Feature]
> I propose a config that allows for a graceful job stop with a savepoint when 
> the tolerable checkpoint failure limit is reached.
> Instead of restarting/failing the job when tolerable checkpoint failure 
> exceeds, when this new config is set to true just trigger stopWithSavepoint.
>
> This could offer the following benefits.
> - Indication of Checkpoint Storage State: Exceeding tolerable checkpoint 
> failures could indicate unstable checkpoint storage.
> - Automated Fallback Strategy: When combined with a monitoring cron job, this 
> feature could act as an automated fallback strategy for handling unstable 
> checkpoint storage.
>   The job would stop safely, take a savepoint, and then you could 
> automatically restart with different checkpoint storage configured like 
> switching from S3 to HDFS.
>
> For example let's say checkpoint path is configured to s3 and savepoint path 
> is configured to hdfs.
> When the new config is set to true the job stops with savepoint like below 
> when tolerable checkpoint failure exceeds.
> And we can restart the job from that savepoint while the checkpoint 
> configured as hdfs.
>
>
>
> Looking forward to hearing the community's thoughts on this proposal.
> And also want to ask how the community is handling long lasting unstable 
> checkpoint storage issues.
>
> Thanks in advance.
>
> Best dongwoo,


[DISCUSS] Add config to enable job stop with savepoint on exceeding tolerable checkpoint Failures

2023-09-04 Thread Dongwoo Kim
Hi all,
I have a proposal that aims to enhance the flink application's resilience
in cases of unexpected failures in checkpoint storages like S3 or HDFS,

*[Background]*
When using self managed S3-compatible object storage, we faced checkpoint
async failures lasting for an extended period more than 30 minutes,
leading to multiple job restarts and causing lags in our streaming
application.


*[Current Behavior]*Currently, when the number of checkpoint failures
exceeds a predefined tolerable limit, flink will either restart or fail the
job based on how it's configured.
In my opinion this does not handle scenarios where the checkpoint storage
itself may be unreliable or experiencing downtime.

*[Proposed Feature]*
I propose a config that allows for a graceful job stop with a savepoint
when the tolerable checkpoint failure limit is reached.
Instead of restarting/failing the job when tolerable checkpoint failure
exceeds, when this new config is set to true just trigger stopWithSavepoint.

This could offer the following benefits.
- Indication of Checkpoint Storage State: Exceeding tolerable checkpoint
failures could indicate unstable checkpoint storage.
- Automated Fallback Strategy: When combined with a monitoring cron job,
this feature could act as an automated fallback strategy for handling
unstable checkpoint storage.
  The job would stop safely, take a savepoint, and then you could
automatically restart with different checkpoint storage configured like
switching from S3 to HDFS.

For example let's say checkpoint path is configured to s3 and savepoint
path is configured to hdfs.
When the new config is set to true the job stops with savepoint like below
when tolerable checkpoint failure exceeds.
And we can restart the job from that savepoint while the checkpoint
configured as hdfs.
[image: image.png]


Looking forward to hearing the community's thoughts on this proposal.
And also want to ask how the community is handling long lasting unstable
checkpoint storage issues.

Thanks in advance.

Best dongwoo,