Hi William

I don't believe the same job would have 70~80GB state for RocksDB while it's 
only 200MB for HeapStateBackend even though RocksDB has some space 
amplification. Are you sure the job received the same input throughput with 
different state backends and they both run well without any failover? Could you 
take a savepoint for the job with different state backends and compare the size 
of the savepoints? What's more, what version of Flink did you use?

Best
Yun Tang
________________________________
From: William Jonsson <william.jons...@niradynamics.se>
Sent: Friday, August 30, 2019 17:04
To: user@flink.apache.org <user@flink.apache.org>
Cc: Fleet Perception for Maintenance 
<fleetperceptionformaintena...@niradynamics.se>
Subject: Non incremental window function accumulates unbounded state with 
RocksDb


Hello,

I have a Flink pipeline reading data from Kafka which is keyed (in the 
pseudocode example below it is keyed on the first letter in the string, in our 
pipeline it is keyed on a predefined key) and processed in sliding windows with 
a duration of 60m every 10:th minute. The time setting is eventTime and the 
windows processes the data when the window should fire, there are no 
incremental processing of the windowed data.

When running with a Heap backend the state behaves “normally”, i.e it stays 
within the data size that is expected when the windows have buffered the data 
(~200 Mb for this application) and is bounded to around this size independent 
of the lifetime of the processing pipeline. However, if the state backend is 
changed to the RocksDb backend the states starts to grow indefinitely (is our 
observation, we haven’t seen it stop growing at least) to 70-80 Gb in just 
above of a month runtime.

I made a save point of the state and downloaded it and analysed which shown 
that the state consisted of data from the whole lifetime of the pipeline, of 
about equal size for each day. I interpret this as the state has accumulated 
the old data which should’ve been deleted during the clearing of the windows. 
It is worth noting that the state consists of the input Strings only, so it 
should have nothing to do with the histogram calculation?

I have tried to reduce the level0_num_of_files_compation_trigger to be 1 and 
the base file size as well as the target file size to trigger more compactions 
in the hope of that the compactions would remove the obsolete data which 
rendered in no improvement at all (it basically got worse).

Please see the pseudocode below for a code example. The pipeline is more 
complex than this and is running on other classes than String input and a 
“histogram” output class. Do you have any input or ideas how the state could be 
manageable in the Heap case but totally unhandleable during the RocksDb version?

Best regards,

William



class Histogram extends WindowFunction[String, Histogram, TimeWindow] {

def process (key : T, window: TimeWindow, input : Itrable[String]) = {

         //Calculate the histogram

}

override def apply(key : T, window: TimeWindow, input : Iterable[String], out: 
Collector[Histogram]) : Unit = {

        out.collect(process(key, window, input))

}

}

env.getCheckpointConfig.setCheckpointTimeout(400000)

env.getCheckpointConfig.setMinPauseBetweenCheckpoint(450000)

val stateBackend : StateBackend = new RocksDBStateBackend(s3://.., true)

env.setStateBackend(stateBackend)

env.enableCheckpointing(900000)

DataStream<String> stream = env

        .addSource(new FlinkKafkaConsumer08<>("topic", new 
SimpleStringSchema(), properties));

env.

stream.keyBy(e => e.charAt(0)).timeWindow(minutes(60), minutes(10)).apply(new 
Histogram()).name(“Pseudocode").uuid(“Psuedocode”)



William Jonsson
Systems Engineer
Fleet Perception for Maintenance        
[cid:nd_logo_d020cb30-0d08-4da8-8390-474f0e5447c8.png]
NIRA Dynamics AB
Wallenbergs Gata 4
583 30 Linköping
Sweden  Mobile: +46 722 178 247
william.jons...@niradynamics.se
www.niradynamics.se
Together for smarter safety

Reply via email to