Discard checkpoint files through a single recursive call

2021-06-14 Thread Jiahui Jiang
Hello Flink!

We are building an infrastructure where we implement our own 
CompletedCheckpointStore. The read and write to the external storage location 
of these checkpoints are through HTTP calls to an external service.

Recently we noticed some checkpoint file cleanup performance issue when the job 
writes out a very high number of checkpoint files per checkpoint. (In our case 
we had a few hundreds of operators and ran with 16 parallelism)
During checkpoint state discard phase, since the implementation in 
CompletedCheckpoint discards the state files one by one, we are seeing a very 
high number of remote calls. Sometimes the deletion fails to catch up with the 
checkpoint progress.

Given the interface we are given to configure the external storage location for 
checkpoints is always a `target directory`. Would it be reasonable to expose an 
implementation of discard() that directly calls disposeStorageLocation with 
recursive set to true, without iterating over each individual files first? Is 
there any blockers for that?

Thank you!


links
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java#L240
https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCompletedCheckpointStorageLocation.java#L70


Re: Discard checkpoint files through a single recursive call

2021-06-15 Thread Guowei Ma
hi, Jiang

I am afraid of misunderstanding what you mean, so can you elaborate on how
you want to change it? For example, which interface or class do you want to
add a method to?
Although I am not a state expert, as far as I know, due to incremental
checkpoints, when CompleteCheckpoint is discarding, it is necessary to call
the discardState method of each State.

Best,
Guowei


On Tue, Jun 15, 2021 at 7:37 AM Jiahui Jiang 
wrote:

> Hello Flink!
>
> We are building an infrastructure where we implement our own
> CompletedCheckpointStore. The read and write to the external storage
> location of these checkpoints are through HTTP calls to an external service.
>
> Recently we noticed some checkpoint file cleanup performance issue when
> the job writes out a very high number of checkpoint files per checkpoint.
> (In our case we had a few hundreds of operators and ran with 16
> parallelism)
> During checkpoint state discard phase, since the implementation in
> CompletedCheckpoint discards the state files one by one, we are seeing a
> very high number of remote calls. Sometimes the deletion fails to catch up
> with the checkpoint progress.
>
> Given the interface we are given to configure the external storage
> location for checkpoints is always a `target directory`. Would it be
> reasonable to expose an implementation of discard() that directly calls
> disposeStorageLocation with recursive set to true, without iterating over
> each individual files first? Is there any blockers for that?
>
> Thank you!
>
>
> links
>
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java#L240
>
> https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCompletedCheckpointStorageLocation.java#L70
>


Re: Discard checkpoint files through a single recursive call

2021-06-15 Thread Yun Tang
Hi Jiang,

Please take a look at FLINK-17860 and FLINK-13856 for previous discussion of 
this problem.

[1] https://issues.apache.org/jira/browse/FLINK-17860
[2] https://issues.apache.org/jira/browse/FLINK-13856

Best
Yun Tang


From: Guowei Ma 
Sent: Wednesday, June 16, 2021 8:40
To: Jiahui Jiang 
Cc: user@flink.apache.org 
Subject: Re: Discard checkpoint files through a single recursive call

hi, Jiang

I am afraid of misunderstanding what you mean, so can you elaborate on how you 
want to change it? For example, which interface or class do you want to add a 
method to?
Although I am not a state expert, as far as I know, due to incremental 
checkpoints, when CompleteCheckpoint is discarding, it is necessary to call the 
discardState method of each State.

Best,
Guowei


On Tue, Jun 15, 2021 at 7:37 AM Jiahui Jiang 
mailto:qzhzm173...@hotmail.com>> wrote:
Hello Flink!

We are building an infrastructure where we implement our own 
CompletedCheckpointStore. The read and write to the external storage location 
of these checkpoints are through HTTP calls to an external service.

Recently we noticed some checkpoint file cleanup performance issue when the job 
writes out a very high number of checkpoint files per checkpoint. (In our case 
we had a few hundreds of operators and ran with 16 parallelism)
During checkpoint state discard phase, since the implementation in 
CompletedCheckpoint discards the state files one by one, we are seeing a very 
high number of remote calls. Sometimes the deletion fails to catch up with the 
checkpoint progress.

Given the interface we are given to configure the external storage location for 
checkpoints is always a `target directory`. Would it be reasonable to expose an 
implementation of discard() that directly calls disposeStorageLocation with 
recursive set to true, without iterating over each individual files first? Is 
there any blockers for that?

Thank you!


links
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java#L240
https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCompletedCheckpointStorageLocation.java#L70


Re: Discard checkpoint files through a single recursive call

2021-06-15 Thread Jiahui Jiang
Hello Yun and Guowei,

Thanks for the context! Looks like the plan is to have a Flink config flag to 
enable recursive deletion? Is there any plan to push through this PR in the 
next release? https://github.com/apache/flink/pull/9602


Thank you so much!
Jiahui

From: Yun Tang 
Sent: Tuesday, June 15, 2021 10:27 PM
To: Guowei Ma ; Jiahui Jiang 
Cc: user@flink.apache.org 
Subject: Re: Discard checkpoint files through a single recursive call

Hi Jiang,

Please take a look at FLINK-17860 and FLINK-13856 for previous discussion of 
this problem.

[1] https://issues.apache.org/jira/browse/FLINK-17860
[2] https://issues.apache.org/jira/browse/FLINK-13856

Best
Yun Tang


From: Guowei Ma 
Sent: Wednesday, June 16, 2021 8:40
To: Jiahui Jiang 
Cc: user@flink.apache.org 
Subject: Re: Discard checkpoint files through a single recursive call

hi, Jiang

I am afraid of misunderstanding what you mean, so can you elaborate on how you 
want to change it? For example, which interface or class do you want to add a 
method to?
Although I am not a state expert, as far as I know, due to incremental 
checkpoints, when CompleteCheckpoint is discarding, it is necessary to call the 
discardState method of each State.

Best,
Guowei


On Tue, Jun 15, 2021 at 7:37 AM Jiahui Jiang 
mailto:qzhzm173...@hotmail.com>> wrote:
Hello Flink!

We are building an infrastructure where we implement our own 
CompletedCheckpointStore. The read and write to the external storage location 
of these checkpoints are through HTTP calls to an external service.

Recently we noticed some checkpoint file cleanup performance issue when the job 
writes out a very high number of checkpoint files per checkpoint. (In our case 
we had a few hundreds of operators and ran with 16 parallelism)
During checkpoint state discard phase, since the implementation in 
CompletedCheckpoint discards the state files one by one, we are seeing a very 
high number of remote calls. Sometimes the deletion fails to catch up with the 
checkpoint progress.

Given the interface we are given to configure the external storage location for 
checkpoints is always a `target directory`. Would it be reasonable to expose an 
implementation of discard() that directly calls disposeStorageLocation with 
recursive set to true, without iterating over each individual files first? Is 
there any blockers for that?

Thank you!


links
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java#L240
https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCompletedCheckpointStorageLocation.java#L70


Re: Discard checkpoint files through a single recursive call

2021-06-18 Thread Piotr Nowojski
Hi,

Unfortunately at the moment I think there are no plans to push for this. I
would suggest you to bump/cast a vote on
https://issues.apache.org/jira/browse/FLINK-13856 in order to allows us
more accurately prioritise efforts.

Best,
Piotrek

śr., 16 cze 2021 o 05:46 Jiahui Jiang  napisał(a):

> Hello Yun and Guowei,
>
> Thanks for the context! Looks like the plan is to have a Flink config flag
> to enable recursive deletion? Is there any plan to push through this PR in
> the next release? https://github.com/apache/flink/pull/9602
>
>
> Thank you so much!
> Jiahui
> --
> *From:* Yun Tang 
> *Sent:* Tuesday, June 15, 2021 10:27 PM
> *To:* Guowei Ma ; Jiahui Jiang <
> qzhzm173...@hotmail.com>
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Discard checkpoint files through a single recursive call
>
> Hi Jiang,
>
> Please take a look at FLINK-17860 and FLINK-13856 for previous discussion
> of this problem.
>
> [1] https://issues.apache.org/jira/browse/FLINK-17860
> [2] https://issues.apache.org/jira/browse/FLINK-13856
>
> Best
> Yun Tang
>
> --
> *From:* Guowei Ma 
> *Sent:* Wednesday, June 16, 2021 8:40
> *To:* Jiahui Jiang 
> *Cc:* user@flink.apache.org 
> *Subject:* Re: Discard checkpoint files through a single recursive call
>
> hi, Jiang
>
> I am afraid of misunderstanding what you mean, so can you elaborate on how
> you want to change it? For example, which interface or class do you want to
> add a method to?
> Although I am not a state expert, as far as I know, due to incremental
> checkpoints, when CompleteCheckpoint is discarding, it is necessary to call
> the discardState method of each State.
>
> Best,
> Guowei
>
>
> On Tue, Jun 15, 2021 at 7:37 AM Jiahui Jiang 
> wrote:
>
> Hello Flink!
>
> We are building an infrastructure where we implement our own
> CompletedCheckpointStore. The read and write to the external storage
> location of these checkpoints are through HTTP calls to an external service.
>
> Recently we noticed some checkpoint file cleanup performance issue when
> the job writes out a very high number of checkpoint files per checkpoint.
> (In our case we had a few hundreds of operators and ran with 16
> parallelism)
> During checkpoint state discard phase, since the implementation in
> CompletedCheckpoint discards the state files one by one, we are seeing a
> very high number of remote calls. Sometimes the deletion fails to catch up
> with the checkpoint progress.
>
> Given the interface we are given to configure the external storage
> location for checkpoints is always a `target directory`. Would it be
> reasonable to expose an implementation of discard() that directly calls
> disposeStorageLocation with recursive set to true, without iterating over
> each individual files first? Is there any blockers for that?
>
> Thank you!
>
>
> links
>
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java#L240
>
> https://github.com/apache/flink/blob/99c2a415e9eeefafacf70762b6f54070f7911ceb/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCompletedCheckpointStorageLocation.java#L70
>
>