Hi, > 1. After multiple full checkpoints and a NATIVE savepoint the size was > unchanged. I'm wondering if RocksDb compaction is because we never update > key values? The state is nearly fully composed of keys' space. Do keys not > get freed using RocksDb compaction filter for TTL?
Regarding TTL-related questions, has your job been running for 30 days? TTL is checked based on the last time a key was created or updated. Regarding “ I'm wondering if RocksDb compaction is because we never update key values”: Periodic compaction could speed up expired state entries cleanup, especially for state entries rarely accessed[1], maybe you can try to set it. BTW, are there any deletion operations in your job? > 2. That should work but will doing that "reset the clock" for the TTL? No, TTL is stored as part of KV and therefore will not be reset. I think you can try “TTL Periodic compaction”[1] first. 3. Yes, restoring from a canonical savepoint can bypass FLINK-34050, and a canonical savepoint should be generated first. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/#cleanup-of-expired-state Cliff Resnick <cre...@gmail.com> 于2024年6月3日周一 02:44写道: > > Hi everyone, > > > We have a Flink application that has a very large and perhaps unusual state. > The basic shape of it is a very large and somewhat random keyed-stream > partition space, each with a continuously growing map-state keyed by > microsecond time Long values. There are never any overwrites in the map state > which is monotonic per partition key. Map state was chosen over list state > in the hope that we can manage a sliding window using TTL. Using RocksDB > incremental checkpointing, the app runs very well despite the large total > checkpoint size. Our current checkpoint size is 3.2TB. > > > We have multiple questions around space amplification problems when using the > RocksDB backend and I'm wondering if anyone can suggest or confirm answers. > > > > 1. Using LEVEL compaction we have not seen any decrease in total checkpoint > size through TTL compaction. To test the TTL, I cut the period from 60 to 30 > days (we have well over 60 days processing time), enabled > cleanupFullSnapshot() and ran a test job without incremental checkpointing > enabled. After multiple full checkpoints and a NATIVE savepoint the size was > unchanged. I'm wondering if RocksDb compaction is because we never update > key values? The state is nearly fully composed of keys' space. Do keys not > get freed using RocksDb compaction filter for TTL? > > 2. I'm wondering if FIFO compaction is a solution for above. To move to that > that we will need to first take a canonical savepoint then redeploy with > RocksDB/FIFO. That should work but will doing that "reset the clock" for the > TTL? Given it's nature though, I am leaning to this as our only option. > > > 3. Rescaling is a problem because of this issue: > https://issues.apache.org/jira/browse/FLINK-34050. The fix for this is not > yet released. Because of this bug the checkpoint size scales somewhat larger > than is proportionate to the job rescaling. For example if we go from 44 > slots to 60, the checkpoint will scale from 3.2 TB to 4.9 TB. Before 1.19.1 > is released can cherry-pick the fix and create our own Docker image, or will > restoring from a canonical savepoint as described above sidestep this bug? > > > If anyone can help with any insights, please do! > > -- Best, Yanfei