Re: TTL issue with large RocksDB keyed state

2024-06-03 Thread Yanfei Lei
Hi,

> 1. After multiple full checkpoints and a NATIVE savepoint the size was 
> unchanged. I'm wondering if RocksDb compaction is  because we never update 
> key values? The state is nearly fully composed of keys' space. Do keys not 
> get freed using RocksDb compaction filter for TTL?

Regarding TTL-related questions, has your job been running for 30
days? TTL is checked based on the last time a key was created or
updated.

Regarding “ I'm wondering if RocksDb compaction is  because we never
update key values”: Periodic compaction could speed up expired state
entries cleanup, especially for state entries rarely accessed[1],
maybe you can try to set it. BTW, are there any deletion operations in
your job?

> 2. That should work but will doing that "reset the clock" for the TTL?

No, TTL is stored as part of KV and therefore will not be reset. I
think you can try “TTL Periodic compaction”[1] first.

3. Yes, restoring from a canonical savepoint can bypass FLINK-34050,
and a canonical savepoint should be generated first.

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/state/#cleanup-of-expired-state

Cliff Resnick  于2024年6月3日周一 02:44写道:
>
> Hi everyone,
>
>
> We have a Flink application that has a very large and perhaps unusual state. 
> The basic shape of it is a very large and somewhat random keyed-stream 
> partition space, each with a continuously growing map-state keyed by 
> microsecond time Long values. There are never any overwrites in the map state 
> which is monotonic per partition key.  Map state was chosen over list state 
> in the hope that we can manage a sliding window using TTL. Using RocksDB 
> incremental checkpointing, the app runs very well despite the large total 
> checkpoint size. Our current checkpoint size is 3.2TB.
>
>
> We have multiple questions around space amplification problems when using the 
> RocksDB backend and I'm wondering if anyone can suggest or confirm answers.
>
>
>
> 1. Using LEVEL compaction we have not seen any decrease in total checkpoint 
> size through TTL compaction. To test the TTL, I cut the period from 60 to 30 
> days (we have well over 60 days processing time), enabled 
> cleanupFullSnapshot() and ran a test job without incremental checkpointing 
> enabled. After multiple full checkpoints and a NATIVE savepoint the size was 
> unchanged. I'm wondering if RocksDb compaction is  because we never update 
> key values? The state is nearly fully composed of keys' space. Do keys not 
> get freed using RocksDb compaction filter for TTL?
>
> 2. I'm wondering if FIFO compaction is a solution for above. To move to that 
> that we will need to first take a canonical savepoint then redeploy with 
> RocksDB/FIFO. That should work but will doing that "reset the clock" for the 
> TTL? Given it's nature though, I am leaning to this as our only option.
>
>
> 3. Rescaling is a problem because of this issue: 
> https://issues.apache.org/jira/browse/FLINK-34050. The fix for this is not 
> yet released. Because of this bug  the checkpoint size scales somewhat larger 
> than is proportionate to the job rescaling. For example if we go from 44 
> slots to 60, the checkpoint will scale from 3.2 TB to 4.9 TB. Before 1.19.1 
> is released can cherry-pick the fix and create our own Docker image, or will 
> restoring from a canonical savepoint as described above sidestep this bug?
>
>
> If anyone can help with any insights, please do!
>
>



-- 
Best,
Yanfei


TTL issue with large RocksDB keyed state

2024-06-02 Thread Cliff Resnick
Hi everyone,


We have a Flink application that has a very large and perhaps unusual
state. The basic shape of it is a very large and somewhat random
keyed-stream partition space, each with a continuously growing map-state
keyed by microsecond time Long values. There are never any overwrites in
the map state which is monotonic per partition key.  Map state was chosen
over list state in the hope that we can manage a sliding window using TTL.
Using RocksDB incremental checkpointing, the app runs very well despite the
large total checkpoint size. Our current checkpoint size is 3.2TB.


We have multiple questions around space amplification problems when using
the RocksDB backend and I'm wondering if anyone can suggest or confirm
answers.



1. Using LEVEL compaction we have not seen any decrease in total checkpoint
size through TTL compaction. To test the TTL, I cut the period from 60 to
30 days (we have well over 60 days processing time), enabled
cleanupFullSnapshot() and ran a test job without incremental checkpointing
enabled. After multiple full checkpoints and a NATIVE savepoint the size
was unchanged. I'm wondering if RocksDb compaction is  because we never
update key values? The state is nearly fully composed of keys' space. Do
keys not get freed using RocksDb compaction filter for TTL?

2. I'm wondering if FIFO compaction is a solution for above. To move to
that that we will need to first take a canonical savepoint then redeploy
with RocksDB/FIFO. That should work but will doing that "reset the clock"
for the TTL? Given it's nature though, I am leaning to this as our only
option.


3. Rescaling is a problem because of this issue:
https://issues.apache.org/jira/browse/FLINK-34050. The fix for this is not
yet released. Because of this bug  the checkpoint size scales somewhat
larger than is proportionate to the job rescaling. For example if we go
from 44 slots to 60, the checkpoint will scale from 3.2 TB to 4.9 TB.
Before 1.19.1 is released can cherry-pick the fix and create our own Docker
image, or will restoring from a canonical savepoint as described above
sidestep this bug?


If anyone can help with any insights, please do!