Re: Memory Leak - Flink / RocksDB ?

2017-07-28 Thread Stefan Richter
Hi,

I see that matching the RocksDB configuration to fit certain container sizes 
can be very tedious and error prone for users. I have opened a jira to start 
improving the situation: https://issues.apache.org/jira/browse/FLINK-7289 
. Please feel free to comment 
and share your experiences or ideas, they might be very valuable input.

One consideration, from what you shared I can see that you are using 8 slots 
per task and a heap size of 35840MB. This means that there are potentially also 
up to 8 RocksDB instances on one TM. Furthermore, when you are using RocksDB, 
your heavy state will typically live in RocksDB (native memory) and no longer 
on the JVM heap. I think it would make a lot of sense to reduce you maximum 
heap size dramatically, so that more memory from your container budget is 
available as native memory for RocksDB. I hope this can also help with your 
problem.

Best,
Stefan

> Am 27.07.2017 um 10:49 schrieb Shashwat Rastogi 
> :
> 
> Hi Kien,
> 
> Sorry it took me sometime to fetch the logs. I am attaching logs of the 
> machine which died due to lack of free memory. 
> 
> 
> 
> I have set only
> `taskmanager.heap.mb: 35840`
> taskmanager.numberOfTaskSlots: 8
> And the rest are just default properties in the flink-conf.yaml
> 
> Thank you in advance.
> 
> Regards
> Shashwat
> 
>> On 26-Jul-2017, at 12:10 PM, Kien Truong > > wrote:
>> 
>> Hi,
>> 
>> What're your task manager memory configuration ? Can you post the 
>> TaskManager's log ?
>> 
>> Regards,
>> 
>> Kien
>> 
>> 
>> On 7/25/2017 8:41 PM, Shashwat Rastogi wrote:
>>> Hi,
>>> 
>>> We have several Flink jobs, all of which reads data from Kafka do some 
>>> aggregations (over sliding windows of (1d, 1h)) and writes data to 
>>> Cassandra. Something like :
>>> 
>>> ```
>>> DataStream lines = env.addSource(new FlinkKafkaConsumer010( … ));
>>> DataStream events = lines.map(line -> parse(line));
>>> DataStream stats = stream
>>> .keyBy(“id”)
>>> .timeWindow(1d, 1h)
>>> .sum(new MyAggregateFunction());
>>> writeToCassandra(stats);
>>> ```
>>> 
>>> We recently made a switch to RocksDbStateBackend, for it’s suitability for 
>>> large states/long windows. However, after making the switch a memory issues 
>>> has come up, the memory utilisation on TaskManager gradually increases from 
>>> 50 GB to ~63GB until the container is killed. We are unable to figure out 
>>> what is causing this behaviour, is there some memory leak on the RocksDB ?
>>> 
>>> How much memory should we allocate to the Flink TaskManager? Since, RocksDB 
>>> is a native application and it does not use the JVM how much of the memory 
>>> should we allocate/leave for RocksDB (out of 64GB of total memory).
>>> Is there a way to set the maximum amount of memory that will be used by 
>>> RocksDB so that it doesn’t overwhelms the system? Are there some 
>>> recommended optimal settings for RocksDB for larger states (for 1 day 
>>> window average state size is 3GB).
>>> 
>>> Any help would be greatly appreciated. I am using Flink v1.2.1.
>>> Thanks in advance.
>>> 
>>> Best,
>>> Shashwat
>> 
> 



Re: Memory Leak - Flink / RocksDB ?

2017-07-27 Thread Kien Truong

Hello,

This is the system log, not the task manager's application log, which is 
what I was referring to.


If you're using the standalone cluster, then the task manager log should 
be in the logs directory, inside your Flink's installation


Regards,
Kien

On 7/27/2017 3:49 PM, Shashwat Rastogi wrote:

Hi Kien,

Sorry it took me sometime to fetch the logs. I am attaching logs of 
the machine which died due to lack of free memory.





I have set only
`taskmanager.heap.mb: 35840`
taskmanager.numberOfTaskSlots: 8
And the rest are just default properties in the flink-conf.yaml

Thank you in advance.

Regards
Shashwat

On 26-Jul-2017, at 12:10 PM, Kien Truong > wrote:


Hi,

What're your task manager memory configuration ? Can you post the 
TaskManager's log ?


Regards,

Kien


On 7/25/2017 8:41 PM, Shashwat Rastogi wrote:

Hi,

We have several Flink jobs, all of which reads data from Kafka do 
some aggregations (over sliding windows of (1d, 1h)) and writes data 
to Cassandra. Something like :


```
DataStream lines = env.addSource(new FlinkKafkaConsumer010( 
… ));

DataStream events = lines.map(line -> parse(line));
DataStream stats = stream
.keyBy(“id”)
.timeWindow(1d, 1h)
.sum(new MyAggregateFunction());
writeToCassandra(stats);
```

We recently made a switch to RocksDbStateBackend, for it’s 
suitability for large states/long windows. However, after making the 
switch a memory issues has come up, the memory utilisation on 
TaskManager gradually increases from 50 GB to ~63GB until the 
container is killed. We are unable to figure out what is causing 
this behaviour, is there some memory leak on the RocksDB ?


How much memory should we allocate to the Flink TaskManager? Since, 
RocksDB is a native application and it does not use the JVM how much 
of the memory should we allocate/leave for RocksDB (out of 64GB of 
total memory).
Is there a way to set the maximum amount of memory that will be used 
by RocksDB so that it doesn’t overwhelms the system? Are there some 
recommended optimal settings for RocksDB for larger states (for 1 
day window average state size is 3GB).


Any help would be greatly appreciated. I am using Flink v1.2.1.
Thanks in advance.

Best,
Shashwat








Re: Memory Leak - Flink / RocksDB ?

2017-07-27 Thread Shashwat Rastogi
Hi Kien,Sorry it took me sometime to fetch the logs. I am attaching logs of the machine which died due to lack of free memory. Jun 26 13:00:01 staging-east-dataplatform-02-c01 systemd: Created slice 
user-0.slice.
Jun 26 13:00:01 staging-east-dataplatform-02-c01 systemd: Starting user-0.slice.
Jun 26 13:00:01 staging-east-dataplatform-02-c01 systemd: Started Session 47284 
of user root.
Jun 26 13:00:01 staging-east-dataplatform-02-c01 systemd: Starting Session 
47284 of user root.
Jun 26 13:00:01 staging-east-dataplatform-02-c01 systemd: Removed slice 
user-0.slice.
Jun 26 13:00:01 staging-east-dataplatform-02-c01 systemd: Stopping user-0.slice.
Jun 26 13:01:01 staging-east-dataplatform-02-c01 systemd: Created slice 
user-0.slice.
Jun 26 13:01:01 staging-east-dataplatform-02-c01 systemd: Starting user-0.slice.
Jun 26 13:01:01 staging-east-dataplatform-02-c01 systemd: Started Session 47285 
of user root.
Jun 26 13:01:01 staging-east-dataplatform-02-c01 systemd: Starting Session 
47285 of user root.
Jun 26 13:01:01 staging-east-dataplatform-02-c01 systemd: Started Session 47286 
of user root.
Jun 26 13:01:01 staging-east-dataplatform-02-c01 systemd: Starting Session 
47286 of user root.
Jun 26 13:01:01 staging-east-dataplatform-02-c01 systemd: Removed slice 
user-0.slice.
Jun 26 13:01:01 staging-east-dataplatform-02-c01 systemd: Stopping user-0.slice.
Jun 26 13:01:01 staging-east-dataplatform-02-c01 newrelic-infra: 
time="2017-06-26T13:01:01Z" level=error msg="can't get sample from 
ProcessSampler" error="open /proc/32131: no such file or directory"
Jun 26 13:02:01 staging-east-dataplatform-02-c01 systemd: Created slice 
user-0.slice.
Jun 26 13:02:01 staging-east-dataplatform-02-c01 systemd: Starting user-0.slice.
Jun 26 13:02:01 staging-east-dataplatform-02-c01 systemd: Started Session 47287 
of user root.
Jun 26 13:02:01 staging-east-dataplatform-02-c01 systemd: Starting Session 
47287 of user root.
Jun 26 13:02:01 staging-east-dataplatform-02-c01 systemd: Removed slice 
user-0.slice.
Jun 26 13:02:01 staging-east-dataplatform-02-c01 systemd: Stopping user-0.slice.
Jun 26 13:03:01 staging-east-dataplatform-02-c01 systemd: Created slice 
user-0.slice.
Jun 26 13:03:01 staging-east-dataplatform-02-c01 systemd: Starting user-0.slice.
Jun 26 13:03:01 staging-east-dataplatform-02-c01 systemd: Started Session 47288 
of user root.
Jun 26 13:03:01 staging-east-dataplatform-02-c01 systemd: Starting Session 
47288 of user root.
Jun 26 13:03:01 staging-east-dataplatform-02-c01 systemd: Removed slice 
user-0.slice.
Jun 26 13:03:01 staging-east-dataplatform-02-c01 systemd: Stopping user-0.slice.
Jun 26 13:04:01 staging-east-dataplatform-02-c01 systemd: Created slice 
user-0.slice.
Jun 26 13:04:01 staging-east-dataplatform-02-c01 systemd: Starting user-0.slice.
Jun 26 13:04:01 staging-east-dataplatform-02-c01 systemd: Started Session 47289 
of user root.
Jun 26 13:04:01 staging-east-dataplatform-02-c01 systemd: Starting Session 
47289 of user root.
Jun 26 13:04:02 staging-east-dataplatform-02-c01 systemd: Removed slice 
user-0.slice.
Jun 26 13:04:02 staging-east-dataplatform-02-c01 systemd: Stopping user-0.slice.
Jun 26 13:05:01 staging-east-dataplatform-02-c01 systemd: Created slice 
user-0.slice.
Jun 26 13:05:01 staging-east-dataplatform-02-c01 systemd: Starting user-0.slice.
Jun 26 13:05:01 staging-east-dataplatform-02-c01 systemd: Started Session 47290 
of user root.
Jun 26 13:05:01 staging-east-dataplatform-02-c01 systemd: Starting Session 
47290 of user root.
Jun 26 13:05:01 staging-east-dataplatform-02-c01 systemd: Removed slice 
user-0.slice.
Jun 26 13:05:01 staging-east-dataplatform-02-c01 systemd: Stopping user-0.slice.
Jun 26 13:06:01 staging-east-dataplatform-02-c01 systemd: Created slice 
user-0.slice.
Jun 26 13:06:01 staging-east-dataplatform-02-c01 systemd: Starting user-0.slice.
Jun 26 13:06:01 staging-east-dataplatform-02-c01 systemd: Started Session 47291 
of user root.
Jun 26 13:06:01 staging-east-dataplatform-02-c01 systemd: Starting Session 
47291 of user root.
Jun 26 13:06:01 staging-east-dataplatform-02-c01 systemd: Removed slice 
user-0.slice.
Jun 26 13:06:01 staging-east-dataplatform-02-c01 systemd: Stopping user-0.slice.
Jun 26 13:07:01 staging-east-dataplatform-02-c01 systemd: Created slice 
user-0.slice.
Jun 26 13:07:01 staging-east-dataplatform-02-c01 systemd: Starting user-0.slice.
Jun 26 13:07:01 staging-east-dataplatform-02-c01 systemd: Started Session 47292 
of user root.
Jun 26 13:07:01 staging-east-dataplatform-02-c01 systemd: Starting Session 
47292 of user root.
Jun 26 13:07:01 staging-east-dataplatform-02-c01 systemd: Removed slice 
user-0.slice.
Jun 26 13:07:01 staging-east-dataplatform-02-c01 systemd: Stopping user-0.slice.
Jun 26 13:08:01 staging-east-dataplatform-02-c01 systemd: Created slice 
user-0.slice.
Jun 26 13:08:01 staging-east-dataplatform-02-c01 systemd: Starting user-0.slice.
Jun 26 13:08:01 staging-east-dataplatform-02-c01 systemd: Started Session 47293 
of user root.

Re: Memory Leak - Flink / RocksDB ?

2017-07-26 Thread Shashwat Rastogi
Hi Vinay,

@Vinay :  I am setting RocksDBStateBackend from the code, not from 
flink-conf.yaml. 
I am currently trying out the configurations that you have shared. I’ll let you 
know how they perform. Thank you so much for your help.

However, were you able to figure out what exactly is going wrong with the 
RocksDb setup? 
And, while making these modifications did you notice any significant drop in 
RocksDb performance.

Overall, how do you think we should configure memory for the TaskManager? 
Is there a way to estimate how much memory is sufficient for task manager heap 
and RocksDBStateBackend?

Thank you

Best,
Shashwat




Re: Memory Leak - Flink / RocksDB ?

2017-07-25 Thread Kien Truong

Hi,

What're your task manager memory configuration ? Can you post the 
TaskManager's log ?


Regards,

Kien


On 7/25/2017 8:41 PM, Shashwat Rastogi wrote:

Hi,

We have several Flink jobs, all of which reads data from Kafka do some 
aggregations (over sliding windows of (1d, 1h)) and writes data to Cassandra. 
Something like :

```
DataStream lines = env.addSource(new FlinkKafkaConsumer010( … ));
DataStream events = lines.map(line -> parse(line));
DataStream stats = stream
.keyBy(“id”)
.timeWindow(1d, 1h)
.sum(new MyAggregateFunction());
writeToCassandra(stats);
```

We recently made a switch to RocksDbStateBackend, for it’s suitability for 
large states/long windows. However, after making the switch a memory issues has 
come up, the memory utilisation on TaskManager gradually increases from 50 GB 
to ~63GB until the container is killed. We are unable to figure out what is 
causing this behaviour, is there some memory leak on the RocksDB ?

How much memory should we allocate to the Flink TaskManager? Since, RocksDB is 
a native application and it does not use the JVM how much of the memory should 
we allocate/leave for RocksDB (out of 64GB of total memory).
Is there a way to set the maximum amount of memory that will be used by RocksDB 
so that it doesn’t overwhelms the system? Are there some recommended optimal 
settings for RocksDB for larger states (for 1 day window average state size is 
3GB).

Any help would be greatly appreciated. I am using Flink v1.2.1.
Thanks in advance.

Best,
Shashwat




Re: Memory Leak - Flink / RocksDB ?

2017-07-25 Thread vinay patil
Hi Shashwat,

Are you specifying the RocksDBStateBackend from the flink-conf.yaml or from
code?

If you are specifying it from the code, you can try using
PredefinedOptions.FLASH_SSD_OPTIMIZED
Also, you can try setting incremental checkpointing ( this feature is in
Flink 1.3.0)

If the above does not solve your issue, you can control the memory usage of
RocksDB by tuning the following values and check your performance :

*DBOptions: *
 (along with the FLASH_SSD_OPTIONS add the following)
 maxBackgroundCompactions(4)

*ColumnFamilyOptions:*
  max_buffer_size : 512 MB
  block_cache_size : 128 MB
  max_write_buffer_number : 5
  minimum_buffer_number_to_merge : 2
  cacheIndexAndFilterBlocks : true
  optimizeFilterForHits: true


I would recommend reading the following documents:

*Memory usage of RocksDB* :
https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB

*RocksDB Tuning Guide:*
https://github.com/facebook/rocksdb/wiki/RocksDB-Tuning-Guide

Hope it helps.


Regards,
Vinay Patil

On Tue, Jul 25, 2017 at 6:51 PM, Shashwat Rastogi [via Apache Flink User
Mailing List archive.]  wrote:

> Hi,
>
> We have several Flink jobs, all of which reads data from Kafka do some
> aggregations (over sliding windows of (1d, 1h)) and writes data to
> Cassandra. Something like :
>
> ```
> DataStream lines = env.addSource(new FlinkKafkaConsumer010( … ));
> DataStream events = lines.map(line -> parse(line));
> DataStream stats = stream
> .keyBy(“id”)
> .timeWindow(1d, 1h)
> .sum(new MyAggregateFunction());
> writeToCassandra(stats);
> ```
>
> We recently made a switch to RocksDbStateBackend, for it’s suitability for
> large states/long windows. However, after making the switch a memory issues
> has come up, the memory utilisation on TaskManager gradually increases from
> 50 GB to ~63GB until the container is killed. We are unable to figure out
> what is causing this behaviour, is there some memory leak on the RocksDB ?
>
> How much memory should we allocate to the Flink TaskManager? Since,
> RocksDB is a native application and it does not use the JVM how much of the
> memory should we allocate/leave for RocksDB (out of 64GB of total memory).
> Is there a way to set the maximum amount of memory that will be used by
> RocksDB so that it doesn’t overwhelms the system? Are there some
> recommended optimal settings for RocksDB for larger states (for 1 day
> window average state size is 3GB).
>
> Any help would be greatly appreciated. I am using Flink v1.2.1.
> Thanks in advance.
>
> Best,
> Shashwat
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Memory-Leak-Flink-RocksDB-tp14439.html
> To start a new topic under Apache Flink User Mailing List archive., email
> ml+s2336050n1...@n4.nabble.com
> To unsubscribe from Apache Flink User Mailing List archive., click here
> 
> .
> NAML
> 
>




--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Memory-Leak-Flink-RocksDB-tp14439p14441.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.