Hi,

> 1. After multiple full checkpoints and a NATIVE savepoint the size was 
> unchanged. I'm wondering if RocksDb compaction is  because we never update 
> key values? The state is nearly fully composed of keys' space. Do keys not 
> get freed using RocksDb compaction filter for TTL?

Regarding TTL-related questions, has your job been running for 30
days? TTL is checked based on the last time a key was created or
updated.

Regarding “ I'm wondering if RocksDb compaction is  because we never
update key values”: Periodic compaction could speed up expired state
entries cleanup, especially for state entries rarely accessed[1],
maybe you can try to set it. BTW, are there any deletion operations in
your job?

> 2. That should work but will doing that "reset the clock" for the TTL?

No, TTL is stored as part of KV and therefore will not be reset. I
think you can try “TTL Periodic compaction”[1] first.

3. Yes, restoring from a canonical savepoint can bypass FLINK-34050,
and a canonical savepoint should be generated first.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/#cleanup-of-expired-state

Cliff Resnick <cre...@gmail.com> 于2024年6月3日周一 02:44写道:
>
> Hi everyone,
>
>
> We have a Flink application that has a very large and perhaps unusual state. 
> The basic shape of it is a very large and somewhat random keyed-stream 
> partition space, each with a continuously growing map-state keyed by 
> microsecond time Long values. There are never any overwrites in the map state 
> which is monotonic per partition key.  Map state was chosen over list state 
> in the hope that we can manage a sliding window using TTL. Using RocksDB 
> incremental checkpointing, the app runs very well despite the large total 
> checkpoint size. Our current checkpoint size is 3.2TB.
>
>
> We have multiple questions around space amplification problems when using the 
> RocksDB backend and I'm wondering if anyone can suggest or confirm answers.
>
>
>
> 1. Using LEVEL compaction we have not seen any decrease in total checkpoint 
> size through TTL compaction. To test the TTL, I cut the period from 60 to 30 
> days (we have well over 60 days processing time), enabled 
> cleanupFullSnapshot() and ran a test job without incremental checkpointing 
> enabled. After multiple full checkpoints and a NATIVE savepoint the size was 
> unchanged. I'm wondering if RocksDb compaction is  because we never update 
> key values? The state is nearly fully composed of keys' space. Do keys not 
> get freed using RocksDb compaction filter for TTL?
>
> 2. I'm wondering if FIFO compaction is a solution for above. To move to that 
> that we will need to first take a canonical savepoint then redeploy with 
> RocksDB/FIFO. That should work but will doing that "reset the clock" for the 
> TTL? Given it's nature though, I am leaning to this as our only option.
>
>
> 3. Rescaling is a problem because of this issue: 
> https://issues.apache.org/jira/browse/FLINK-34050. The fix for this is not 
> yet released. Because of this bug  the checkpoint size scales somewhat larger 
> than is proportionate to the job rescaling. For example if we go from 44 
> slots to 60, the checkpoint will scale from 3.2 TB to 4.9 TB. Before 1.19.1 
> is released can cherry-pick the fix and create our own Docker image, or will 
> restoring from a canonical savepoint as described above sidestep this bug?
>
>
> If anyone can help with any insights, please do!
>
>



-- 
Best,
Yanfei

Reply via email to