Hi everyone,

We have a Flink application that has a very large and perhaps unusual
state. The basic shape of it is a very large and somewhat random
keyed-stream partition space, each with a continuously growing map-state
keyed by microsecond time Long values. There are never any overwrites in
the map state which is monotonic per partition key.  Map state was chosen
over list state in the hope that we can manage a sliding window using TTL.
Using RocksDB incremental checkpointing, the app runs very well despite the
large total checkpoint size. Our current checkpoint size is 3.2TB.


We have multiple questions around space amplification problems when using
the RocksDB backend and I'm wondering if anyone can suggest or confirm
answers.



1. Using LEVEL compaction we have not seen any decrease in total checkpoint
size through TTL compaction. To test the TTL, I cut the period from 60 to
30 days (we have well over 60 days processing time), enabled
cleanupFullSnapshot() and ran a test job without incremental checkpointing
enabled. After multiple full checkpoints and a NATIVE savepoint the size
was unchanged. I'm wondering if RocksDb compaction is  because we never
update key values? The state is nearly fully composed of keys' space. Do
keys not get freed using RocksDb compaction filter for TTL?

2. I'm wondering if FIFO compaction is a solution for above. To move to
that that we will need to first take a canonical savepoint then redeploy
with RocksDB/FIFO. That should work but will doing that "reset the clock"
for the TTL? Given it's nature though, I am leaning to this as our only
option.


3. Rescaling is a problem because of this issue:
https://issues.apache.org/jira/browse/FLINK-34050. The fix for this is not
yet released. Because of this bug  the checkpoint size scales somewhat
larger than is proportionate to the job rescaling. For example if we go
from 44 slots to 60, the checkpoint will scale from 3.2 TB to 4.9 TB.
Before 1.19.1 is released can cherry-pick the fix and create our own Docker
image, or will restoring from a canonical savepoint as described above
sidestep this bug?


If anyone can help with any insights, please do!

Reply via email to