Hi,

What're your task manager memory configuration ? Can you post the TaskManager's log ?

Regards,

Kien


On 7/25/2017 8:41 PM, Shashwat Rastogi wrote:
Hi,

We have several Flink jobs, all of which reads data from Kafka do some 
aggregations (over sliding windows of (1d, 1h)) and writes data to Cassandra. 
Something like :

```
DataStream<String> lines = env.addSource(new FlinkKafkaConsumer010( … ));
DataStream<Event> events = lines.map(line -> parse(line));
DataStream<Statistics> stats = stream
        .keyBy(“id”)
        .timeWindow(1d, 1h)
        .sum(new MyAggregateFunction());
writeToCassandra(stats);
```

We recently made a switch to RocksDbStateBackend, for it’s suitability for 
large states/long windows. However, after making the switch a memory issues has 
come up, the memory utilisation on TaskManager gradually increases from 50 GB 
to ~63GB until the container is killed. We are unable to figure out what is 
causing this behaviour, is there some memory leak on the RocksDB ?

How much memory should we allocate to the Flink TaskManager? Since, RocksDB is 
a native application and it does not use the JVM how much of the memory should 
we allocate/leave for RocksDB (out of 64GB of total memory).
Is there a way to set the maximum amount of memory that will be used by RocksDB 
so that it doesn’t overwhelms the system? Are there some recommended optimal 
settings for RocksDB for larger states (for 1 day window average state size is 
3GB).

Any help would be greatly appreciated. I am using Flink v1.2.1.
Thanks in advance.

Best,
Shashwat

Reply via email to