Hi Currently, it is hard to determine which files can be deleted safely in the shared folder, the ground truth is in the checkpoint metafile. I've created an issue[1] for such a feature
[1] https://issues.apache.org/jira/browse/FLINK-17571 Best, Congxian Trystan <entro...@gmail.com> 于2020年5月8日周五 下午1:05写道: > Aha, so incremental checkpointing *does* rely on infinitely-previous > checkpoint state, regardless of the incremental retention number. The > documentation wasn't entirely clear about this. One would assume that if > you retain 3 checkpoints, anything older than the 3rd is irrelevant, but > that's evidently not true. So it is never safe to delete any files in > /shared, because we can't know which files belong to the current job (and > may have lived on from checkpoint 1 even though we're on checkpoint 10 and > only "retain" 3) and which ones have been abandoned altogether (due to a > previous run of the job where we didn't restore state). > > This is really unfortunate - it can lead to a case where you accumulate a > huge number of files in S3 and you can't know when ones to delete, > especially if the job id remains the same (for job mode, they're all > zeros). So this shared state lives on forever and there is no way to ever > clean it up, at all. I am surprised that this hasn't been a problem for > anyone else. Maybe I should just file a feature request for this, at least > to find some solution for ways to clean up these directories. > > I appreciate your patience and help, thank you so much! > > Trystan > > On Thu, May 7, 2020 at 7:15 PM Congxian Qiu <qcx978132...@gmail.com> > wrote: > >> Hi >> >> Yes, there should only files used in checkpoint 8 and 9 and 10 in the >> checkpoint file, but you can not delete the file which created older than 3 >> minutes(because checkpoint 8,9, 10 may reuse the file created in the >> previous checkpoint, this is the how incremental checkpoint works[1]) >> >> you can also check the directory of checkpoint files[2] for more >> information, copied from the website here: >> > The SHARED directory is for state that is possibly part of multiple >> checkpoints, TASKOWNED is for state that must never be dropped by the >> JobManager, and EXCLUSIVE is for state that belongs to one checkpoint >> only. >> >> For the entropy injection, you can enable it as the documentation said, >> it will replace the entropy_key with some random strings with the >> specified length so that the files are not all in the same directory. >> >> [1] >> https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html >> [2] >> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/state/checkpoints.html#directory-structure >> Best, >> Congxian >> >> >> Trystan <entro...@gmail.com> 于2020年5月7日周四 下午12:54写道: >> >>> Thanks Congxian! To make sure I'm understanding correctly, if I retain 3 >>> incremental checkpoints (say every minute), and I've just completed >>> checkpoint 10, then anything in shared is from checkpoint 8 and 9 only. So >>> anything older than ~3 minutes can safely be deleted? The state from >>> checkpoint 5 doesn't live on in the shared directory - at all? >>> >>> I ask because we have run into cases where we end up abandoning the >>> state, and Flink does not clean up state from, say, a previous iteration of >>> the job if you don't restore state. We need to remove these files >>> automatically, but I want to be sure that I don't blow away older files in >>> the shared dir from earlier, subsumed checkpoints - but you are saying that >>> isn't possible, and that all subsumed checkpoints will have their /shared >>> state rewritten or cleaned up as needed, correct? >>> >>> As for entropy, where would you suggest to use it? My understanding is >>> that I don't control anything beyond the checkpoint directory, and since >>> shared is in that directory I can't put entropy inside the shared directory >>> itself (which is what I would need). >>> >>> Thanks, >>> Trystan >>> >>> On Wed, May 6, 2020 at 7:31 PM Congxian Qiu <qcx978132...@gmail.com> >>> wrote: >>> >>>> Hi >>>> For the rate limit, could you please try entropy injection[1]. >>>> For checkpoint, Flink will handle the file lifecycle(it will delete the >>>> file if it will never be used in the future). The file in the checkpoint >>>> will be there if the corresponding checkpoint is still valid. >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/filesystems/s3.html#entropy-injection-for-s3-file-systems >>>> Best, >>>> Congxian >>>> >>>> >>>> Trystan <entro...@gmail.com> 于2020年5月7日周四 上午2:46写道: >>>> >>>>> Hello! >>>>> >>>>> Recently we ran into an issue when checkpointing to S3. Because S3 >>>>> ratelimits based on prefix, the /shared directory would get slammed and >>>>> cause S3 throttling. There is no solution for this, because >>>>> /job/checkpoint/:id/shared is all part of the prefix, and is limited to >>>>> 3,500 >>>>> PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix. >>>>> >>>>> (source: >>>>> https://docs.aws.amazon.com/AmazonS3/latest/dev/optimizing-performance.html >>>>> ) >>>>> >>>>> Jobs sometimes also completely crash, and they leave state laying >>>>> around when we bring the job up fresh. >>>>> >>>>> Our solutions have been to 1) reduce the number of taskmanagers 2) >>>>> reduce the state.backend.rocksdb.checkpoint.transfer.thread.num back to 1 >>>>> (we had increased it to speed up checkpointing/savepoint) and 3) manually >>>>> delete tons of old files in the shared directory. >>>>> >>>>> My question: >>>>> Can we safely apply a Lifecycle Policy to the directory/bucket to >>>>> remove things? How long is stuff under /shared retained? Is it only for >>>>> the >>>>> duration of the oldest checkpoint, or could it carry forward, untouched, >>>>> from the very first checkpoint to the very last? This shared checkpoint >>>>> dir/prefix is currently limiting some scalability of our jobs. I don't >>>>> believe the _entropy_ trick would help this, because the issue is >>>>> ultimately that there's a single shared directory. >>>>> >>>>> Thank you! >>>>> Trystan >>>>> >>>>