[ 
https://issues.apache.org/jira/browse/FLINK-14197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17335188#comment-17335188
 ] 

Thomas edited comment on FLINK-14197 at 4/29/21, 7:13 AM:
----------------------------------------------------------

I'm from the same team as Oliver and we finally have found out the reason the 
state was increasing. It's the MANIFEST rocksdb internal file.
 Looks like in incremental checkpointing mode that file keeps growing untile it 
reaches max_manifest_file_size = 1GB.
 It's possilbe to change that parameter to smaller value (We've tested it with 
event 6KB and that works) by setting DBOptions.setMaxManifestFileSize.

The question is whether such  behaviour for incremental checkpointing mode is 
correct one since the manifest could grow up to 1GB(by default) but state 
itself could be very small, and that changes are propagated to checkpoint 
storage.


was (Author: tomczpl):
I'm from the same time as Oliver and we finally has found the reason the state 
was increasing is the MANIFEST rocksdb internal file.
Looks like in incremental checkpointing mode that file keeps growing untile it 
reaches max_manifest_file_size = 1GB.
It's possilbe to change that parameter to smaller value (We've tested it with 
event 6KB and that works) by setting DBOptions.setMaxManifestFileSize.

The question is whether such  behaviour for incremental checkpointing mode is 
correct one since the manifest could grow up to 1GB(by default) but state 
itself could be very small, and that changes are propagated to checkpoint 
storage.

> Increasing trend for state size of keyed stream using ProcessWindowFunction 
> with ProcessingTimeSessionWindows
> -------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-14197
>                 URL: https://issues.apache.org/jira/browse/FLINK-14197
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing, Runtime / State Backends
>    Affects Versions: 1.9.0
>         Environment: Tested with:
>  * Local Flink Mini Cluster running from IDE
>  * Flink standalone cluster run in docker
>            Reporter: Oliver Kostera
>            Priority: Major
>              Labels: stale-major
>
> I'm using *ProcessWindowFunction* in a keyed stream with the following 
> definition:
> {code:java}
>         final SingleOutputStreamOperator<Message> processWindowFunctionStream 
> =
>             
> keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.milliseconds(100)))
>                 .process(new 
> CustomProcessWindowFunction()).uid(PROCESS_WINDOW_FUNCTION_OPERATOR_ID)
>                 .name("Process window function");
> {code}
> My checkpointing configuration is set to use RocksDB state backend with 
> incremental checkpointing and EXACTLY_ONCE mode.
> In a runtime I noticed that even though data ingestion is static - same keys 
> and frequency of messages the size of the process window operator keeps 
> increasing. I tried to reproduce it with minimal similar setup here: 
> https://github.com/loliver1234/flink-process-window-function and was 
> successful to do so.
> Testing conditions:
> - RabbitMQ source with Exactly-once guarantee and 65k prefetch count
> - RabbitMQ sink to collect messages
> - Simple ProcessWindowFunction that only pass messages through
> - Stream time characteristic set to TimeCharacteristic.ProcessingTime
> Testing scenario:
> - Start flink job and check initial state size - State Size: 127 KB
> - Start sending messages, 1000 same unique keys every 1s (they are not 
> falling into defined time window gap set to 100ms, each message should create 
> new window)
> - State of the process window operator keeps increasing - after 1mln messages 
> state ended up to be around 2mb
> - Stop sending messages and wait till rabbit queue is fully consumed and few 
> checkpoints go by
> - Was expected to see state size to decrease to base value but it stayed at 
> 2mb
> - Continue to send messages with the same keys and state kept increasing 
> trend.
> What I checked:
> - Registration and deregistration of timestamps set for time windows - each 
> registration matched its deregistration
> - Checked that in fact there are no window merges
> - Tried custom Trigger disabling window merges and setting onProcessingTime 
> trigger to TriggerResult.FIRE_AND_PURGE - same state behavior
> On staging environment, we noticed that state for that operator keeps 
> increasing indefinitely, after some months reaching even 1,5gb for 100k 
> unique keys
> Flink commit id: 9c32ed9
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to