Hi

Currently, it is hard to determine which files can be deleted safely in the
shared folder, the ground truth is in the checkpoint metafile. I've created
an issue[1] for such a feature

[1] https://issues.apache.org/jira/browse/FLINK-17571
Best,
Congxian


Trystan <entro...@gmail.com> 于2020年5月8日周五 下午1:05写道:

> Aha, so incremental checkpointing *does* rely on infinitely-previous
> checkpoint state, regardless of the incremental retention number. The
> documentation wasn't entirely clear about this. One would assume that if
> you retain 3 checkpoints, anything older than the 3rd is irrelevant, but
> that's evidently not true. So it is never safe to delete any files in
> /shared, because we can't know which files belong to the current job (and
> may have lived on from checkpoint 1 even though we're on checkpoint 10 and
> only "retain" 3) and which ones have been abandoned altogether (due to a
> previous run of the job where we didn't restore state).
>
> This is really unfortunate - it can lead to a case where you accumulate a
> huge number of files in S3 and you can't know when ones to delete,
> especially if the job id remains the same (for job mode, they're all
> zeros). So this shared state lives on forever and there is no way to ever
> clean it up, at all. I am surprised that this hasn't been a problem for
> anyone else. Maybe I should just file a feature request for this, at least
> to find some solution for ways to clean up these directories.
>
> I appreciate your patience and help, thank you so much!
>
> Trystan
>
> On Thu, May 7, 2020 at 7:15 PM Congxian Qiu <qcx978132...@gmail.com>
> wrote:
>
>> Hi
>>
>> Yes, there should only files used in checkpoint 8 and 9 and 10 in the
>> checkpoint file, but you can not delete the file which created older than 3
>> minutes(because checkpoint 8,9, 10 may reuse the file created in the
>> previous checkpoint, this is the how incremental checkpoint works[1])
>>
>> you can also check the directory of checkpoint files[2] for more
>> information, copied from the website here:
>> > The SHARED directory is for state that is possibly part of multiple
>> checkpoints, TASKOWNED is for state that must never be dropped by the
>> JobManager, and EXCLUSIVE is for state that belongs to one checkpoint
>> only.
>>
>> For the entropy injection, you can enable it as the documentation said,
>> it will replace the entropy_key with some random strings with the
>> specified length so that the files are not all in the same directory.
>>
>> [1]
>> https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#directory-structure
>> Best,
>> Congxian
>>
>>
>> Trystan <entro...@gmail.com> 于2020年5月7日周四 下午12:54写道:
>>
>>> Thanks Congxian! To make sure I'm understanding correctly, if I retain 3
>>> incremental checkpoints (say every minute), and I've just completed
>>> checkpoint 10, then anything in shared is from checkpoint 8 and 9 only. So
>>> anything older than ~3 minutes can safely be deleted? The state from
>>> checkpoint 5 doesn't live on in the shared directory - at all?
>>>
>>> I ask because we have run into cases where we end up abandoning the
>>> state, and Flink does not clean up state from, say, a previous iteration of
>>> the job if you don't restore state. We need to remove these files
>>> automatically, but I want to be sure that I don't blow away older files in
>>> the shared dir from earlier, subsumed checkpoints - but you are saying that
>>> isn't possible, and that all subsumed checkpoints will have their /shared
>>> state rewritten or cleaned up as needed, correct?
>>>
>>> As for entropy, where would you suggest to use it? My understanding is
>>> that I don't control anything beyond the checkpoint directory, and since
>>> shared is in that directory I can't put entropy inside the shared directory
>>> itself (which is what I would need).
>>>
>>> Thanks,
>>> Trystan
>>>
>>> On Wed, May 6, 2020 at 7:31 PM Congxian Qiu <qcx978132...@gmail.com>
>>> wrote:
>>>
>>>> Hi
>>>> For the rate limit, could you please try entropy injection[1].
>>>> For checkpoint, Flink will handle the file lifecycle(it will delete the
>>>> file if it will never be used in the future). The file in the checkpoint
>>>> will be there if the corresponding checkpoint is still valid.
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/filesystems/s3.html#entropy-injection-for-s3-file-systems
>>>> Best,
>>>> Congxian
>>>>
>>>>
>>>> Trystan <entro...@gmail.com> 于2020年5月7日周四 上午2:46写道:
>>>>
>>>>> Hello!
>>>>>
>>>>> Recently we ran into an issue when checkpointing to S3. Because S3
>>>>> ratelimits based on prefix, the /shared directory would get slammed and
>>>>> cause S3 throttling. There is no solution for this, because
>>>>> /job/checkpoint/:id/shared is all part of the prefix, and is limited to 
>>>>> 3,500
>>>>> PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix.
>>>>>
>>>>> (source:
>>>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/optimizing-performance.html
>>>>> )
>>>>>
>>>>> Jobs sometimes also completely crash, and they leave state laying
>>>>> around when we bring the job up fresh.
>>>>>
>>>>> Our solutions have been to 1) reduce the number of taskmanagers 2)
>>>>> reduce the state.backend.rocksdb.checkpoint.transfer.thread.num back to 1
>>>>> (we had increased it to speed up checkpointing/savepoint) and 3) manually
>>>>> delete tons of old files in the shared directory.
>>>>>
>>>>> My question:
>>>>> Can we safely apply a Lifecycle Policy to the directory/bucket to
>>>>> remove things? How long is stuff under /shared retained? Is it only for 
>>>>> the
>>>>> duration of the oldest checkpoint, or could it carry forward, untouched,
>>>>> from the very first checkpoint to the very last? This shared checkpoint
>>>>> dir/prefix is currently limiting some scalability of our jobs. I don't
>>>>> believe the _entropy_ trick would help this, because the issue is
>>>>> ultimately that there's a single shared directory.
>>>>>
>>>>> Thank you!
>>>>> Trystan
>>>>>
>>>>

Reply via email to