Re: RocksDB state on HDFS seems not being cleanned up
Yes, state processor API cannot read window state now, here is the track of this issue [1] [1] https://issues.apache.org/jira/browse/FLINK-13095 Best Yun Tang From: shuwen zhou Date: Monday, November 18, 2019 at 12:31 PM To: user Subject: Fwd: RocksDB state on HDFS seems not being cleanned up Forward to user group again since mail server was rejecting for last time -- Forwarded message - From: shuwen zhou mailto:jaco...@gmail.com>> Date: Wed, 13 Nov 2019 at 13:33 Subject: Re: RocksDB state on HDFS seems not being cleanned up To: Yun Tang mailto:myas...@live.com>> Cc: user mailto:user@flink.apache.org>> Hi Yun, After my investigation, I found out the files are not orphan files, they are still being recorded in latest checkpoint's _metadata file. I looked through the API you mentioned https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html , seems like the state can be accessed is limited to user defined state. I am thinking that the outdated state might be belonged to a window reduce state, thus I would like to access window reduce state. Seems this API cannot provide such functionality, does it? On Thu, 7 Nov 2019 at 18:08, Yun Tang mailto:myas...@live.com>> wrote: Yes, just sum all file size within checkpoint meta to get the full checkpoint size (this would omit some byte stream state handles, but nearly accurate). BTW, I think user-mail list is the better place for this email-thread, already sent this mail to user-mail list. Best Yun Tang From: shuwen zhou mailto:jaco...@gmail.com>> Date: Thursday, November 7, 2019 at 12:02 PM To: Yun Tang mailto:myas...@live.com>> Cc: dev mailto:d...@flink.apache.org>>, Till Rohrmann mailto:trohrm...@apache.org>> Subject: Re: RocksDB state on HDFS seems not being cleanned up Hi Yun, Thank you for your detailed explanation,It brings me a lot to research. I think 1. I should try reduce number of "state.checkpoints.num-retained", maybe to 3, which could decrease amount of shared folder. 2. Does Flink 1.9.0 has the possibility of orphan files? Seems the answer is yes, maybe. I could have use the state process API you mentioned to figure it out and get back to you. 3. I have a look in file /flink/c344b61c456af743e4568a70b626837b/chk-172/_metadata, there are a lot file names like hdfs://hadoop/flink/c344b61c456af743e4568a70b626837b/shared/e9e10c8a-6d73-48e4-9e17-45838d276b03, sum those file's size up is the total size of each chekpoint, am I correct? 4. My checkpoint interval is 16 minutes. On Wed, 6 Nov 2019 at 15:57, Yun Tang mailto:myas...@live.com>> wrote: Hi Shuwen Since you just have 10 “chk-“ folders as expected and when subsuming checkpoints, the “chk-” folder would be removed after we successfully removed shared state [1]. That is to say, I think you might not have too many orphan states files left. To ensure this, you could use state process API [2] to load your checkpoints and compare all the files under “shared” folder to see whether there existed too many orphan files. If this is true, we might think of the custom compaction filter future of FRocksDB. Secondly, your judgment of “20GB each checkpoint” might not be accurate when RocksDB incremental checkpoint is enabled, the UI showed is only the incremental size [3], I suggest you to count your files’s size within your checkpoint meta to know the accurate checkpoint size for each checkpoint. Last but not least, RocksDB’s future of compaction filter to delete expired data only happened during compaction [4], I’m afraid you might need to look up your rocksDB’s LOG file to see the frequency of compaction on task managers. And I think the increasing size might be related with the interval of your checkpoints, what the interval when you executing checkpoints? [1] https://github.com/apache/flink/blob/2ea14169a1997434d45d6f1da6dfe9acd6bd8da3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java#L264<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F2ea14169a1997434d45d6f1da6dfe9acd6bd8da3%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fcheckpoint%2FCompletedCheckpoint.java%23L264&data=02%7C01%7C%7C896d7363bf12404ed4a108d7633747ae%7C84df9e7fe9f640afb435%7C1%7C0%7C637086961380844668&sdata=qC%2FWoO7cTOONGeBw1x7CO84lO4VW33VHqdLJK63mlis%3D&reserved=0> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fdev%2Flibs%2Fstate_processor_api.html&data=02%7C01%7C%7C896d7363bf12404ed4a108d7633747ae%7C84df9e7fe9f640afb435%7C1%7C0%7C637086961380854680&sdata=%2B9kpGf5Te6sDG2Up5CwCNXLV9AU%2FfmXDGQh%2B%2BJh8I9E%3D&reserved=0> [3] htt
Re: RocksDB state on HDFS seems not being cleanned up
Yes, just sum all file size within checkpoint meta to get the full checkpoint size (this would omit some byte stream state handles, but nearly accurate). BTW, I think user-mail list is the better place for this email-thread, already sent this mail to user-mail list. Best Yun Tang From: shuwen zhou Date: Thursday, November 7, 2019 at 12:02 PM To: Yun Tang Cc: dev , Till Rohrmann Subject: Re: RocksDB state on HDFS seems not being cleanned up Hi Yun, Thank you for your detailed explanation,It brings me a lot to research. I think 1. I should try reduce number of "state.checkpoints.num-retained", maybe to 3, which could decrease amount of shared folder. 2. Does Flink 1.9.0 has the possibility of orphan files? Seems the answer is yes, maybe. I could have use the state process API you mentioned to figure it out and get back to you. 3. I have a look in file /flink/c344b61c456af743e4568a70b626837b/chk-172/_metadata, there are a lot file names like hdfs://hadoop/flink/c344b61c456af743e4568a70b626837b/shared/e9e10c8a-6d73-48e4-9e17-45838d276b03, sum those file's size up is the total size of each chekpoint, am I correct? 4. My checkpoint interval is 16 minutes. On Wed, 6 Nov 2019 at 15:57, Yun Tang mailto:myas...@live.com>> wrote: Hi Shuwen Since you just have 10 “chk-“ folders as expected and when subsuming checkpoints, the “chk-” folder would be removed after we successfully removed shared state [1]. That is to say, I think you might not have too many orphan states files left. To ensure this, you could use state process API [2] to load your checkpoints and compare all the files under “shared” folder to see whether there existed too many orphan files. If this is true, we might think of the custom compaction filter future of FRocksDB. Secondly, your judgment of “20GB each checkpoint” might not be accurate when RocksDB incremental checkpoint is enabled, the UI showed is only the incremental size [3], I suggest you to count your files’s size within your checkpoint meta to know the accurate checkpoint size for each checkpoint. Last but not least, RocksDB’s future of compaction filter to delete expired data only happened during compaction [4], I’m afraid you might need to look up your rocksDB’s LOG file to see the frequency of compaction on task managers. And I think the increasing size might be related with the interval of your checkpoints, what the interval when you executing checkpoints? [1] https://github.com/apache/flink/blob/2ea14169a1997434d45d6f1da6dfe9acd6bd8da3/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java#L264<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fapache%2Fflink%2Fblob%2F2ea14169a1997434d45d6f1da6dfe9acd6bd8da3%2Fflink-runtime%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fruntime%2Fcheckpoint%2FCompletedCheckpoint.java%23L264&data=02%7C01%7C%7C896d7363bf12404ed4a108d7633747ae%7C84df9e7fe9f640afb435%7C1%7C0%7C637086961380844668&sdata=qC%2FWoO7cTOONGeBw1x7CO84lO4VW33VHqdLJK63mlis%3D&reserved=0> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fci.apache.org%2Fprojects%2Fflink%2Fflink-docs-stable%2Fdev%2Flibs%2Fstate_processor_api.html&data=02%7C01%7C%7C896d7363bf12404ed4a108d7633747ae%7C84df9e7fe9f640afb435%7C1%7C0%7C637086961380854680&sdata=%2B9kpGf5Te6sDG2Up5CwCNXLV9AU%2FfmXDGQh%2B%2BJh8I9E%3D&reserved=0> [3] https://issues.apache.org/jira/browse/FLINK-13390<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FFLINK-13390&data=02%7C01%7C%7C896d7363bf12404ed4a108d7633747ae%7C84df9e7fe9f640afb435%7C1%7C0%7C637086961380864691&sdata=tDHH%2B3ESGU1xOcqCo%2FeUh3fxGnPKqtCQCmuUNlYd8Kc%3D&reserved=0> [4] https://github.com/facebook/rocksdb/blob/834feaff05a4bf7ae49c736305d5eb180aed4011/include/rocksdb/compaction_filter.h#L61<https://eur02.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Ffacebook%2Frocksdb%2Fblob%2F834feaff05a4bf7ae49c736305d5eb180aed4011%2Finclude%2Frocksdb%2Fcompaction_filter.h%23L61&data=02%7C01%7C%7C896d7363bf12404ed4a108d7633747ae%7C84df9e7fe9f640afb435%7C1%7C0%7C637086961380874702&sdata=AWXlKG0jEtpTc5QHGoWJ%2F%2Fj5UELElT1V3FzDFfburbI%3D&reserved=0> Best Yun Tang From: shuwen zhou mailto:jaco...@gmail.com>> Date: Wednesday, November 6, 2019 at 12:02 PM To: dev mailto:d...@flink.apache.org>>, Yun Tang mailto:myas...@live.com>>, Till Rohrmann mailto:trohrm...@apache.org>> Subject: Re: RocksDB state on HDFS seems not being cleanned up Hi Yun and Till, Thank you for your response. For @Yun 1. No, I just renamed the checkpoint directory name since the directory name contains company data. Sorry for the confusion. 2. Yes, I set state.check
Re: RocksDB state on HDFS seems not being cleanned up
This should be sent to user mailing list. Moving it here... Original Message Sender: bupt_ljy Recipient: dev Date: Tuesday, Nov 5, 2019 21:13 Subject: Re: RocksDB state on HDFS seems not being cleanned up Hi Shuwen, The “shared” means that the state files are shared among multiple checkpoints, which happens when you enable incremental checkpointing[1]. Therefore, it’s reasonable that the size keeps growing if you set “state.checkpoint.num-retained” to be a big value. [1] https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html Best, Jiayi Liao Original Message Sender: shuwen zhou Recipient: dev Date: Tuesday, Nov 5, 2019 17:59 Subject: RocksDB state on HDFS seems not being cleanned up Hi Community, I have a job running on Flink1.9.0 on YARN with rocksDB on HDFS with incremental checkpoint enabled. I have some MapState in code with following config: val ttlConfig = StateTtlConfig .newBuilder(Time.minutes(30) .updateTtlOnCreateAndWrite() .cleanupInBackground() .cleanupFullSnapshot() .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) After running for around 2 days, I observed checkpoint folder is showing 44.4 M /flink-chk743e4568a70b626837b/chk-40 65.9 M /flink-chk743e4568a70b626837b/chk-41 91.7 M /flink-chk743e4568a70b626837b/chk-42 96.1 M /flink-chk743e4568a70b626837b/chk-43 48.1 M /flink-chk743e4568a70b626837b/chk-44 71.6 M /flink-chk743e4568a70b626837b/chk-45 50.9 M /flink-chk743e4568a70b626837b/chk-46 90.2 M /flink-chk743e4568a70b626837b/chk-37 49.3 M /flink-chk743e4568a70b626837b/chk-38 96.9 M /flink-chk743e4568a70b626837b/chk-39 797.9 G /flink-chk743e4568a70b626837b/shared The ./shared folder size seems continuing increasing and seems the folder is not being clean up. However while I disabled incremental cleanup, the expired full snapshot will be removed automatically. Is there any way to remove outdated state on HDFS to stop it from increasing? Thanks. -- Best Wishes, Shuwen Zhou