Re: Flink memory leak

2017-11-17 Thread Piotr Nowojski
gt;> On 2017-11-08 15:20, Piotr Nowojski wrote:
>>>>>>>>>> Hi Ebru and Javier,
>>>>>>>>>> Yes, if you could share this example job it would be helpful.
>>>>>>>>>> Ebru: could you explain in a little more details how does
>>>>>>>>> your Job(s)
>>>>>>>>>> look like? Could you post some code? If you are just using
>>>>>>>>> maps and
>>>>>>>>>> filters there shouldn’t be any network transfers involved,
>>>>>>>>> aside
>>>>>>>>>> from Source and Sink functions.
>>>>>>>>>> Piotrek
>>>>>>>>>> On 8 Nov 2017, at 12:54, ebru
>>>>>>>>> <b20926...@cs.hacettepe.edu.tr 
>>>>>>>>> <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
>>>>>>>>>> Hi Javier,
>>>>>>>>>> It would be helpful if you share your test job with us.
>>>>>>>>>> Which configurations did you try?
>>>>>>>>>> -Ebru
>>>>>>>>>> On 8 Nov 2017, at 14:43, Javier Lopez
>>>>>>>>> <javier.lo...@zalando.de <mailto:javier.lo...@zalando.de>>
>>>>>>>>>> wrote:
>>>>>>>>>> Hi,
>>>>>>>>>> We have been facing a similar problem. We have tried some
>>>>>>>>> different
>>>>>>>>>> configurations, as proposed in other email thread by Flavio
>>>>>>>>> and
>>>>>>>>>> Kien, but it didn't work. We have a workaround similar to
>>>>>>>>> the one
>>>>>>>>>> that Flavio has, we restart the taskmanagers once they reach
>>>>>>>>> a
>>>>>>>>>> memory threshold. We created a small test to remove all of
>>>>>>>>> our
>>>>>>>>>> dependencies and leave only flink native libraries. This
>>>>>>>>> test reads
>>>>>>>>>> data from a Kafka topic and writes it back to another topic
>>>>>>>>> in
>>>>>>>>>> Kafka. We cancel the job and start another every 5 seconds.
>>>>>>>>> After
>>>>>>>>>> ~30 minutes of doing this process, the cluster reaches the
>>>>>>>>> OS memory
>>>>>>>>>> limit and dies.
>>>>>>>>>> Currently, we have a test cluster with 8 workers and 8 task
>>>>>>>>> slots
>>>>>>>>>> per node. We have one job that uses 56 slots, and we cannot
>>>>>>>>> execute
>>>>>>>>>> that job 5 times in a row because the whole cluster dies. If
>>>>>>>>> you
>>>>>>>>>> want, we can publish our test job.
>>>>>>>>>> Regards,
>>>>>>>>>> On 8 November 2017 at 11:20, Aljoscha Krettek
>>>>>>>>> <aljos...@apache.org <mailto:aljos...@apache.org>>
>>>>>>>>>> wrote:
>>>>>>>>>> @Nico & @Piotr Could you please have a look at this? You
>>>>>>>>> both
>>>>>>>>>> recently worked on the network stack and might be most
>>>>>>>>> familiar with
>>>>>>>>>> this.
>>>>>>>>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier
>>>>>>>>> <pomperma...@okkam.it <mailto:pomperma...@okkam.it>>
>>>>>>>>>> wrote:
>>>>>>>>>> We also have the same problem in production. At the moment
>>>>>>>>> the
>>>>>>>>>> solution is to restart the entire Flink cluster after every
>>>>>>>>> job..
>>>>>>>>>> We've tried to reproduce this problem with a test (see
>>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-7845 
>>>>>>>>>> <https://issues.apache.org/jira/browse/FLINK-7845> [1]) but we
>>>>>>>>> don't
>>>>>>>>>> know whether the error produced by t

Re: Flink memory leak

2017-11-14 Thread Piotr Nowojski
 you are not using the
>>>>>>> Kafka source/sink?
>>>>>>>>> Piotrek
>>>>>>>>> On 8 Nov 2017, at 14:08, Javier Lopez <javier.lo...@zalando.de 
>>>>>>>>> <mailto:javier.lo...@zalando.de>>
>>>>>>> wrote:
>>>>>>>>> Hi,
>>>>>>>>> This is the test flink job we created to trigger this leak
>>>>>>> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 
>>>>>>> <https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6>
>>>>>>>>> And this is the python script we are using to execute the job
>>>>>>> thousands of times to get the OOM problem
>>>>>>> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 
>>>>>>> <https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107>
>>>>>>>>> The cluster we used for this has this configuration:
>>>>>>>>> Instance type: t2.large
>>>>>>>>> Number of workers: 2
>>>>>>>>> HeapMemory: 5500
>>>>>>>>> Number of task slots per node: 4
>>>>>>>>> TaskMangMemFraction: 0.5
>>>>>>>>> NumberOfNetworkBuffers: 2000
>>>>>>>>> We have tried several things, increasing the heap, reducing the
>>>>>>> heap, more memory fraction, changes this value in the
>>>>>>> taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to
>>>>>>> work.
>>>>>>>>> Thanks for your help.
>>>>>>>>> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>>>>>> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> 
>>>>>>> wrote:
>>>>>>>> On 2017-11-08 15:20, Piotr Nowojski wrote:
>>>>>>>> Hi Ebru and Javier,
>>>>>>>> Yes, if you could share this example job it would be helpful.
>>>>>>>> Ebru: could you explain in a little more details how does
>>>>>>> your Job(s)
>>>>>>>> look like? Could you post some code? If you are just using
>>>>>>> maps and
>>>>>>>> filters there shouldn’t be any network transfers involved,
>>>>>>> aside
>>>>>>>> from Source and Sink functions.
>>>>>>>> Piotrek
>>>>>>>> On 8 Nov 2017, at 12:54, ebru
>>>>>>> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> 
>>>>>>> wrote:
>>>>>>>> Hi Javier,
>>>>>>>> It would be helpful if you share your test job with us.
>>>>>>>> Which configurations did you try?
>>>>>>>> -Ebru
>>>>>>>> On 8 Nov 2017, at 14:43, Javier Lopez
>>>>>>> <javier.lo...@zalando.de <mailto:javier.lo...@zalando.de>>
>>>>>>>> wrote:
>>>>>>>> Hi,
>>>>>>>> We have been facing a similar problem. We have tried some
>>>>>>> different
>>>>>>>> configurations, as proposed in other email thread by Flavio
>>>>>>> and
>>>>>>>> Kien, but it didn't work. We have a workaround similar to
>>>>>>> the one
>>>>>>>> that Flavio has, we restart the taskmanagers once they reach
>>>>>>> a
>>>>>>>> memory threshold. We created a small test to remove all of
>>>>>>> our
>>>>>>>> dependencies and leave only flink native libraries. This
>>>>>>> test reads
>>>>>>>> data from a Kafka topic and writes it back to another topic
>>>>>>> in
>>>>>>>> Kafka. We cancel the job and start another every 5 seconds.
>>>>>>> After
>>>>>>>> ~30 minutes of doing this process, the cluster reaches the
>>>>>>> OS memory
>>>>>>>> limit and dies.
>>>>>>>> Currently, we have a test cluster with 8 workers and 8 task
>>>>>>> slots
>>>>>>>> per node. We have one job that uses 56 slots, and we cannot
>>>>>>> 

Re: Flink memory leak

2017-11-14 Thread Flavio Pompermaier
 but it
> will take some time to die. It's better with some data.
> On 8 November 2017 at 14:54, Piotr Nowojski
> <pi...@data-artisans.com> wrote:
> Hi,
> Thanks for sharing this job.
> Do I need to feed some data to the Kafka to reproduce this
>
> issue with your script?
>
> Does this OOM issue also happen when you are not using the
>
> Kafka source/sink?
>
> Piotrek
> On 8 Nov 2017, at 14:08, Javier Lopez <javier.lo...@zalando.de>
>
> wrote:
>
> Hi,
> This is the test flink job we created to trigger this leak
>
> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6
>
> And this is the python script we are using to execute the job
>
> thousands of times to get the OOM problem
> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107
>
> The cluster we used for this has this configuration:
> Instance type: t2.large
> Number of workers: 2
> HeapMemory: 5500
> Number of task slots per node: 4
> TaskMangMemFraction: 0.5
> NumberOfNetworkBuffers: 2000
> We have tried several things, increasing the heap, reducing the
>
> heap, more memory fraction, changes this value in the
> taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to
> work.
>
> Thanks for your help.
> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>
> <b20926...@cs.hacettepe.edu.tr> wrote:
>
> On 2017-11-08 15:20, Piotr Nowojski wrote:
> Hi Ebru and Javier,
> Yes, if you could share this example job it would be helpful.
> Ebru: could you explain in a little more details how does
>
> your Job(s)
>
> look like? Could you post some code? If you are just using
>
> maps and
>
> filters there shouldn’t be any network transfers involved,
>
> aside
>
> from Source and Sink functions.
> Piotrek
> On 8 Nov 2017, at 12:54, ebru
>
> <b20926...@cs.hacettepe.edu.tr> wrote:
>
> Hi Javier,
> It would be helpful if you share your test job with us.
> Which configurations did you try?
> -Ebru
> On 8 Nov 2017, at 14:43, Javier Lopez
>
> <javier.lo...@zalando.de>
>
> wrote:
> Hi,
> We have been facing a similar problem. We have tried some
>
> different
>
> configurations, as proposed in other email thread by Flavio
>
> and
>
> Kien, but it didn't work. We have a workaround similar to
>
> the one
>
> that Flavio has, we restart the taskmanagers once they reach
>
> a
>
> memory threshold. We created a small test to remove all of
>
> our
>
> dependencies and leave only flink native libraries. This
>
> test reads
>
> data from a Kafka topic and writes it back to another topic
>
> in
>
> Kafka. We cancel the job and start another every 5 seconds.
>
> After
>
> ~30 minutes of doing this process, the cluster reaches the
>
> OS memory
>
> limit and dies.
> Currently, we have a test cluster with 8 workers and 8 task
>
> slots
>
> per node. We have one job that uses 56 slots, and we cannot
>
> execute
>
> that job 5 times in a row because the whole cluster dies. If
>
> you
>
> want, we can publish our test job.
> Regards,
> On 8 November 2017 at 11:20, Aljoscha Krettek
>
> <aljos...@apache.org>
>
> wrote:
> @Nico & @Piotr Could you please have a look at this? You
>
> both
>
> recently worked on the network stack and might be most
>
> familiar with
>
> this.
> On 8. Nov 2017, at 10:25, Flavio Pompermaier
>
> <pomperma...@okkam.it>
>
> wrote:
> We also have the same problem in production. At the moment
>
> the
>
> solution is to restart the entire Flink cluster after every
>
> job..
>
> We've tried to reproduce this problem with a test (see
> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we
>
> don't
>
> know whether the error produced by the test and the leak are
> correlated..
> Best,
> Flavio
> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA
>
> EBRU
>
> <b20926...@cs.hacettepe.edu.tr> wrote:
> On 2017-11-07 16:53, Ufuk Celebi wrote:
> Do you use any windowing? If yes, could you please share
>
> that code?
>
> If
> there is no stateful operation at all, it's strange where
>
> the list
>
> state instances are coming from.
> On Tue, Nov 7, 2017 at 2:35 PM, ebru
>
> <b20926...@cs.hacettepe.edu.tr>
>
> wrote:
> Hi Ufuk,
> We don’t explicitly define any state descriptor. We only
>
> use map
>
> and filters
> operator. We thought that gc handle clearing the flink’s
>
> internal
>
> states.
> So how can we manage the memory if it is always increasing?
> - Ebru
> On

Re: Flink memory leak

2017-11-14 Thread Piotr Nowojski
ilar problem. We have tried some
>>>>>> different
>>>>>>> configurations, as proposed in other email thread by Flavio
>>>>>> and
>>>>>>> Kien, but it didn't work. We have a workaround similar to
>>>>>> the one
>>>>>>> that Flavio has, we restart the taskmanagers once they reach
>>>>>> a
>>>>>>> memory threshold. We created a small test to remove all of
>>>>>> our
>>>>>>> dependencies and leave only flink native libraries. This
>>>>>> test reads
>>>>>>> data from a Kafka topic and writes it back to another topic
>>>>>> in
>>>>>>> Kafka. We cancel the job and start another every 5 seconds.
>>>>>> After
>>>>>>> ~30 minutes of doing this process, the cluster reaches the
>>>>>> OS memory
>>>>>>> limit and dies.
>>>>>>> Currently, we have a test cluster with 8 workers and 8 task
>>>>>> slots
>>>>>>> per node. We have one job that uses 56 slots, and we cannot
>>>>>> execute
>>>>>>> that job 5 times in a row because the whole cluster dies. If
>>>>>> you
>>>>>>> want, we can publish our test job.
>>>>>>> Regards,
>>>>>>> On 8 November 2017 at 11:20, Aljoscha Krettek
>>>>>> <aljos...@apache.org>
>>>>>>> wrote:
>>>>>>> @Nico & @Piotr Could you please have a look at this? You
>>>>>> both
>>>>>>> recently worked on the network stack and might be most
>>>>>> familiar with
>>>>>>> this.
>>>>>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier
>>>>>> <pomperma...@okkam.it>
>>>>>>> wrote:
>>>>>>> We also have the same problem in production. At the moment
>>>>>> the
>>>>>>> solution is to restart the entire Flink cluster after every
>>>>>> job..
>>>>>>> We've tried to reproduce this problem with a test (see
>>>>>>> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we
>>>>>> don't
>>>>>>> know whether the error produced by the test and the leak are
>>>>>>> correlated..
>>>>>>> Best,
>>>>>>> Flavio
>>>>>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA
>>>>>> EBRU
>>>>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>>>> On 2017-11-07 16:53, Ufuk Celebi wrote:
>>>>>>> Do you use any windowing? If yes, could you please share
>>>>>> that code?
>>>>>>> If
>>>>>>> there is no stateful operation at all, it's strange where
>>>>>> the list
>>>>>>> state instances are coming from.
>>>>>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru
>>>>>> <b20926...@cs.hacettepe.edu.tr>
>>>>>>> wrote:
>>>>>>> Hi Ufuk,
>>>>>>> We don’t explicitly define any state descriptor. We only
>>>>>> use map
>>>>>>> and filters
>>>>>>> operator. We thought that gc handle clearing the flink’s
>>>>>> internal
>>>>>>> states.
>>>>>>> So how can we manage the memory if it is always increasing?
>>>>>>> - Ebru
>>>>>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org> wrote:
>>>>>>> Hey Ebru, the memory usage might be increasing as long as a
>>>>>> job is
>>>>>>> running.
>>>>>>> This is expected (also in the case of multiple running
>>>>>> jobs). The
>>>>>>> screenshots are not helpful in that regard. :-(
>>>>>>> What kind of stateful operations are you using? Depending on
>>>>>> your
>>>>>>> use case,
>>>>>>> you have to manually call `clear()` on the state instance in
>>>>>> order
>>>>>>> to
>>>>>>> release the managed state.
>>>>>>> Best,
>>>>>>> Ufuk
>>>>>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru
>>>>>>

Re: Flink memory leak

2017-11-10 Thread Piotr Nowojski
>>>>>> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107
>>>>>>>>>>> The cluster we used for this has this configuration:
>>>>>>>>>>> Instance type: t2.large
>>>>>>>>>>> Number of workers: 2
>>>>>>>>>>> HeapMemory: 5500
>>>>>>>>>>> Number of task slots per node: 4
>>>>>>>>>>> TaskMangMemFraction: 0.5
>>>>>>>>>>> NumberOfNetworkBuffers: 2000
>>>>>>>>>>> We have tried several things, increasing the heap, reducing the
>>>>>>>>> heap, more memory fraction, changes this value in the
>>>>>>>>> taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to
>>>>>>>>> work.
>>>>>>>>>>> Thanks for your help.
>>>>>>>>>>> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>>>>>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>>>>>>> On 2017-11-08 15:20, Piotr Nowojski wrote:
>>>>>>>>>> Hi Ebru and Javier,
>>>>>>>>>> Yes, if you could share this example job it would be helpful.
>>>>>>>>>> Ebru: could you explain in a little more details how does
>>>>>>>>> your Job(s)
>>>>>>>>>> look like? Could you post some code? If you are just using
>>>>>>>>> maps and
>>>>>>>>>> filters there shouldn’t be any network transfers involved,
>>>>>>>>> aside
>>>>>>>>>> from Source and Sink functions.
>>>>>>>>>> Piotrek
>>>>>>>>>> On 8 Nov 2017, at 12:54, ebru
>>>>>>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>>>>>>> Hi Javier,
>>>>>>>>>> It would be helpful if you share your test job with us.
>>>>>>>>>> Which configurations did you try?
>>>>>>>>>> -Ebru
>>>>>>>>>> On 8 Nov 2017, at 14:43, Javier Lopez
>>>>>>>>> <javier.lo...@zalando.de>
>>>>>>>>>> wrote:
>>>>>>>>>> Hi,
>>>>>>>>>> We have been facing a similar problem. We have tried some
>>>>>>>>> different
>>>>>>>>>> configurations, as proposed in other email thread by Flavio
>>>>>>>>> and
>>>>>>>>>> Kien, but it didn't work. We have a workaround similar to
>>>>>>>>> the one
>>>>>>>>>> that Flavio has, we restart the taskmanagers once they reach
>>>>>>>>> a
>>>>>>>>>> memory threshold. We created a small test to remove all of
>>>>>>>>> our
>>>>>>>>>> dependencies and leave only flink native libraries. This
>>>>>>>>> test reads
>>>>>>>>>> data from a Kafka topic and writes it back to another topic
>>>>>>>>> in
>>>>>>>>>> Kafka. We cancel the job and start another every 5 seconds.
>>>>>>>>> After
>>>>>>>>>> ~30 minutes of doing this process, the cluster reaches the
>>>>>>>>> OS memory
>>>>>>>>>> limit and dies.
>>>>>>>>>> Currently, we have a test cluster with 8 workers and 8 task
>>>>>>>>> slots
>>>>>>>>>> per node. We have one job that uses 56 slots, and we cannot
>>>>>>>>> execute
>>>>>>>>>> that job 5 times in a row because the whole cluster dies. If
>>>>>>>>> you
>>>>>>>>>> want, we can publish our test job.
>>>>>>>>>> Regards,
>>>>>>>>>> On 8 November 2017 at 11:20, Aljoscha Krettek
>>>>>>>>> <aljos...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>> @Nico & @Piotr Could you please have a look at this? You
>>>>>>>>> both
>>>>>>>>>> recently worked on th

Re: Flink memory leak

2017-11-10 Thread ÇETİNKAYA EBRU ÇETİNKAYA EBRU
the
taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to
work.

Thanks for your help.
On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU

<b20926...@cs.hacettepe.edu.tr> wrote:

On 2017-11-08 15:20, Piotr Nowojski wrote:
Hi Ebru and Javier,
Yes, if you could share this example job it would be helpful.
Ebru: could you explain in a little more details how does

your Job(s)

look like? Could you post some code? If you are just using

maps and

filters there shouldn’t be any network transfers involved,

aside

from Source and Sink functions.
Piotrek
On 8 Nov 2017, at 12:54, ebru

<b20926...@cs.hacettepe.edu.tr> wrote:

Hi Javier,
It would be helpful if you share your test job with us.
Which configurations did you try?
-Ebru
On 8 Nov 2017, at 14:43, Javier Lopez

<javier.lo...@zalando.de>

wrote:
Hi,
We have been facing a similar problem. We have tried some

different

configurations, as proposed in other email thread by Flavio

and

Kien, but it didn't work. We have a workaround similar to

the one

that Flavio has, we restart the taskmanagers once they reach

a

memory threshold. We created a small test to remove all of

our

dependencies and leave only flink native libraries. This

test reads

data from a Kafka topic and writes it back to another topic

in

Kafka. We cancel the job and start another every 5 seconds.

After

~30 minutes of doing this process, the cluster reaches the

OS memory

limit and dies.
Currently, we have a test cluster with 8 workers and 8 task

slots

per node. We have one job that uses 56 slots, and we cannot

execute

that job 5 times in a row because the whole cluster dies. If

you

want, we can publish our test job.
Regards,
On 8 November 2017 at 11:20, Aljoscha Krettek

<aljos...@apache.org>

wrote:
@Nico & @Piotr Could you please have a look at this? You

both

recently worked on the network stack and might be most

familiar with

this.
On 8. Nov 2017, at 10:25, Flavio Pompermaier

<pomperma...@okkam.it>

wrote:
We also have the same problem in production. At the moment

the

solution is to restart the entire Flink cluster after every

job..

We've tried to reproduce this problem with a test (see
https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we

don't

know whether the error produced by the test and the leak are
correlated..
Best,
Flavio
On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA

EBRU

<b20926...@cs.hacettepe.edu.tr> wrote:
On 2017-11-07 16:53, Ufuk Celebi wrote:
Do you use any windowing? If yes, could you please share

that code?

If
there is no stateful operation at all, it's strange where

the list

state instances are coming from.
On Tue, Nov 7, 2017 at 2:35 PM, ebru

<b20926...@cs.hacettepe.edu.tr>

wrote:
Hi Ufuk,
We don’t explicitly define any state descriptor. We only

use map

and filters
operator. We thought that gc handle clearing the flink’s

internal

states.
So how can we manage the memory if it is always increasing?
- Ebru
On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org> wrote:
Hey Ebru, the memory usage might be increasing as long as a

job is

running.
This is expected (also in the case of multiple running

jobs). The

screenshots are not helpful in that regard. :-(
What kind of stateful operations are you using? Depending on

your

use case,
you have to manually call `clear()` on the state instance in

order

to
release the managed state.
Best,
Ufuk
On Tue, Nov 7, 2017 at 12:43 PM, ebru
<b20926...@cs.hacettepe.edu.tr> wrote:
Begin forwarded message:
From: ebru <b20926...@cs.hacettepe.edu.tr>
Subject: Re: Flink memory leak
Date: 7 November 2017 at 14:09:17 GMT+3
To: Ufuk Celebi <u...@apache.org>
Hi Ufuk,
There are there snapshots of htop output.
1. snapshot is initial state.
2. snapshot is after submitted one job.
3. Snapshot is the output of the one job with 15000 EPS. And

the

memory
usage is always increasing over time.
<1.png><2.png><3.png>
On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org> wrote:
Hey Ebru,
let me pull in Aljoscha (CC'd) who might have an idea what's

causing

this.
Since multiple jobs are running, it will be hard to

understand to

which job the state descriptors from the heap snapshot

belong to.

- Is it possible to isolate the problem and reproduce the

behaviour

with only a single job?
– Ufuk
On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU

ÇETİNKAYA EBRU

<b20926...@cs.hacettepe.edu.tr> wrote:
Hi,
We are using Flink 1.3.1 in production, we have one job

manager and

3 task
managers in standalone mode. Recently, we've noticed that we

have

memory
related problems. We use docker container to serve Flink

cluster. We

have
300 slots and 20 jobs are running with parallelism of 10.

Also the

job
count
may be change over time. Taskmanager memory usage always

increases.

After
job cancelation this memory usage doesn't decrease. We've

tried to

investigate the pro

Re: Flink memory leak

2017-11-10 Thread Piotr Nowojski
d a small test to remove all of
>>>>>> our
>>>>>>> dependencies and leave only flink native libraries. This
>>>>>> test reads
>>>>>>> data from a Kafka topic and writes it back to another topic
>>>>>> in
>>>>>>> Kafka. We cancel the job and start another every 5 seconds.
>>>>>> After
>>>>>>> ~30 minutes of doing this process, the cluster reaches the
>>>>>> OS memory
>>>>>>> limit and dies.
>>>>>>> Currently, we have a test cluster with 8 workers and 8 task
>>>>>> slots
>>>>>>> per node. We have one job that uses 56 slots, and we cannot
>>>>>> execute
>>>>>>> that job 5 times in a row because the whole cluster dies. If
>>>>>> you
>>>>>>> want, we can publish our test job.
>>>>>>> Regards,
>>>>>>> On 8 November 2017 at 11:20, Aljoscha Krettek
>>>>>> <aljos...@apache.org>
>>>>>>> wrote:
>>>>>>> @Nico & @Piotr Could you please have a look at this? You
>>>>>> both
>>>>>>> recently worked on the network stack and might be most
>>>>>> familiar with
>>>>>>> this.
>>>>>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier
>>>>>> <pomperma...@okkam.it>
>>>>>>> wrote:
>>>>>>> We also have the same problem in production. At the moment
>>>>>> the
>>>>>>> solution is to restart the entire Flink cluster after every
>>>>>> job..
>>>>>>> We've tried to reproduce this problem with a test (see
>>>>>>> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we
>>>>>> don't
>>>>>>> know whether the error produced by the test and the leak are
>>>>>>> correlated..
>>>>>>> Best,
>>>>>>> Flavio
>>>>>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA
>>>>>> EBRU
>>>>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>>>> On 2017-11-07 16:53, Ufuk Celebi wrote:
>>>>>>> Do you use any windowing? If yes, could you please share
>>>>>> that code?
>>>>>>> If
>>>>>>> there is no stateful operation at all, it's strange where
>>>>>> the list
>>>>>>> state instances are coming from.
>>>>>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru
>>>>>> <b20926...@cs.hacettepe.edu.tr>
>>>>>>> wrote:
>>>>>>> Hi Ufuk,
>>>>>>> We don’t explicitly define any state descriptor. We only
>>>>>> use map
>>>>>>> and filters
>>>>>>> operator. We thought that gc handle clearing the flink’s
>>>>>> internal
>>>>>>> states.
>>>>>>> So how can we manage the memory if it is always increasing?
>>>>>>> - Ebru
>>>>>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org> wrote:
>>>>>>> Hey Ebru, the memory usage might be increasing as long as a
>>>>>> job is
>>>>>>> running.
>>>>>>> This is expected (also in the case of multiple running
>>>>>> jobs). The
>>>>>>> screenshots are not helpful in that regard. :-(
>>>>>>> What kind of stateful operations are you using? Depending on
>>>>>> your
>>>>>>> use case,
>>>>>>> you have to manually call `clear()` on the state instance in
>>>>>> order
>>>>>>> to
>>>>>>> release the managed state.
>>>>>>> Best,
>>>>>>> Ufuk
>>>>>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru
>>>>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>>>> Begin forwarded message:
>>>>>>> From: ebru <b20926...@cs.hacettepe.edu.tr>
>>>>>>> Subject: Re: Flink memory leak
>>>>>>> Date: 7 November 2017 at 14:09:17 GMT+3
>>>>>>> To: Ufuk Celebi <u...@apache.org>
>>>>>>> Hi Ufuk,
>>>>>>> There are there snapshots 

Re: Flink memory leak

2017-11-10 Thread Piotr Nowojski
 but it
>>>>> will take some time to die. It's better with some data.
>>>>> On 8 November 2017 at 14:54, Piotr Nowojski
>>>>> <pi...@data-artisans.com> wrote:
>>>>> Hi,
>>>>> Thanks for sharing this job.
>>>>> Do I need to feed some data to the Kafka to reproduce this
>>>> issue with your script?
>>>>>> Does this OOM issue also happen when you are not using the
>>>> Kafka source/sink?
>>>>>> Piotrek
>>>>>> On 8 Nov 2017, at 14:08, Javier Lopez <javier.lo...@zalando.de>
>>>> wrote:
>>>>>> Hi,
>>>>>> This is the test flink job we created to trigger this leak
>>>> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6
>>>>>> And this is the python script we are using to execute the job
>>>> thousands of times to get the OOM problem
>>>> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107
>>>>>> The cluster we used for this has this configuration:
>>>>>> Instance type: t2.large
>>>>>> Number of workers: 2
>>>>>> HeapMemory: 5500
>>>>>> Number of task slots per node: 4
>>>>>> TaskMangMemFraction: 0.5
>>>>>> NumberOfNetworkBuffers: 2000
>>>>>> We have tried several things, increasing the heap, reducing the
>>>> heap, more memory fraction, changes this value in the
>>>> taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to
>>>> work.
>>>>>> Thanks for your help.
>>>>>> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>> On 2017-11-08 15:20, Piotr Nowojski wrote:
>>>>> Hi Ebru and Javier,
>>>>> Yes, if you could share this example job it would be helpful.
>>>>> Ebru: could you explain in a little more details how does
>>>> your Job(s)
>>>>> look like? Could you post some code? If you are just using
>>>> maps and
>>>>> filters there shouldn’t be any network transfers involved,
>>>> aside
>>>>> from Source and Sink functions.
>>>>> Piotrek
>>>>> On 8 Nov 2017, at 12:54, ebru
>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>> Hi Javier,
>>>>> It would be helpful if you share your test job with us.
>>>>> Which configurations did you try?
>>>>> -Ebru
>>>>> On 8 Nov 2017, at 14:43, Javier Lopez
>>>> <javier.lo...@zalando.de>
>>>>> wrote:
>>>>> Hi,
>>>>> We have been facing a similar problem. We have tried some
>>>> different
>>>>> configurations, as proposed in other email thread by Flavio
>>>> and
>>>>> Kien, but it didn't work. We have a workaround similar to
>>>> the one
>>>>> that Flavio has, we restart the taskmanagers once they reach
>>>> a
>>>>> memory threshold. We created a small test to remove all of
>>>> our
>>>>> dependencies and leave only flink native libraries. This
>>>> test reads
>>>>> data from a Kafka topic and writes it back to another topic
>>>> in
>>>>> Kafka. We cancel the job and start another every 5 seconds.
>>>> After
>>>>> ~30 minutes of doing this process, the cluster reaches the
>>>> OS memory
>>>>> limit and dies.
>>>>> Currently, we have a test cluster with 8 workers and 8 task
>>>> slots
>>>>> per node. We have one job that uses 56 slots, and we cannot
>>>> execute
>>>>> that job 5 times in a row because the whole cluster dies. If
>>>> you
>>>>> want, we can publish our test job.
>>>>> Regards,
>>>>> On 8 November 2017 at 11:20, Aljoscha Krettek
>>>> <aljos...@apache.org>
>>>>> wrote:
>>>>> @Nico & @Piotr Could you please have a look at this? You
>>>> both
>>>>> recently worked on the network stack and might be most
>>>> familiar with
>>>>> this.
>>>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier
>>>> <pomperma...@okkam.it>
>>>>> wrote:
>>>>> We also have the same problem in production. At the moment
>>>> the
&g

Re: Flink memory leak

2017-11-10 Thread Piotr Nowojski
em
>> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107
>>>> The cluster we used for this has this configuration:
>>>> Instance type: t2.large
>>>> Number of workers: 2
>>>> HeapMemory: 5500
>>>> Number of task slots per node: 4
>>>> TaskMangMemFraction: 0.5
>>>> NumberOfNetworkBuffers: 2000
>>>> We have tried several things, increasing the heap, reducing the
>> heap, more memory fraction, changes this value in the
>> taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to
>> work.
>>>> Thanks for your help.
>>>> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>> On 2017-11-08 15:20, Piotr Nowojski wrote:
>>> Hi Ebru and Javier,
>>> Yes, if you could share this example job it would be helpful.
>>> Ebru: could you explain in a little more details how does
>> your Job(s)
>>> look like? Could you post some code? If you are just using
>> maps and
>>> filters there shouldn’t be any network transfers involved,
>> aside
>>> from Source and Sink functions.
>>> Piotrek
>>> On 8 Nov 2017, at 12:54, ebru
>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>> Hi Javier,
>>> It would be helpful if you share your test job with us.
>>> Which configurations did you try?
>>> -Ebru
>>> On 8 Nov 2017, at 14:43, Javier Lopez
>> <javier.lo...@zalando.de>
>>> wrote:
>>> Hi,
>>> We have been facing a similar problem. We have tried some
>> different
>>> configurations, as proposed in other email thread by Flavio
>> and
>>> Kien, but it didn't work. We have a workaround similar to
>> the one
>>> that Flavio has, we restart the taskmanagers once they reach
>> a
>>> memory threshold. We created a small test to remove all of
>> our
>>> dependencies and leave only flink native libraries. This
>> test reads
>>> data from a Kafka topic and writes it back to another topic
>> in
>>> Kafka. We cancel the job and start another every 5 seconds.
>> After
>>> ~30 minutes of doing this process, the cluster reaches the
>> OS memory
>>> limit and dies.
>>> Currently, we have a test cluster with 8 workers and 8 task
>> slots
>>> per node. We have one job that uses 56 slots, and we cannot
>> execute
>>> that job 5 times in a row because the whole cluster dies. If
>> you
>>> want, we can publish our test job.
>>> Regards,
>>> On 8 November 2017 at 11:20, Aljoscha Krettek
>> <aljos...@apache.org>
>>> wrote:
>>> @Nico & @Piotr Could you please have a look at this? You
>> both
>>> recently worked on the network stack and might be most
>> familiar with
>>> this.
>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier
>> <pomperma...@okkam.it>
>>> wrote:
>>> We also have the same problem in production. At the moment
>> the
>>> solution is to restart the entire Flink cluster after every
>> job..
>>> We've tried to reproduce this problem with a test (see
>>> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we
>> don't
>>> know whether the error produced by the test and the leak are
>>> correlated..
>>> Best,
>>> Flavio
>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA
>> EBRU
>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>> On 2017-11-07 16:53, Ufuk Celebi wrote:
>>> Do you use any windowing? If yes, could you please share
>> that code?
>>> If
>>> there is no stateful operation at all, it's strange where
>> the list
>>> state instances are coming from.
>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru
>> <b20926...@cs.hacettepe.edu.tr>
>>> wrote:
>>> Hi Ufuk,
>>> We don’t explicitly define any state descriptor. We only
>> use map
>>> and filters
>>> operator. We thought that gc handle clearing the flink’s
>> internal
>>> states.
>>> So how can we manage the memory if it is always increasing?
>>> - Ebru
>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org> wrote:
>>> Hey Ebru, the memory usage might be increasing as long as a
>> job is
>>> running.
>>> This is expected (also in the case of multiple running
>> jobs). The
>>> screenshots are not helpful in that regard. :-(
>>> What kin

Re: Flink memory leak

2017-11-09 Thread Piotr Nowojski
t;> HeapMemory: 5500
>>>>>> Number of task slots per node: 4
>>>>>> TaskMangMemFraction: 0.5
>>>>>> NumberOfNetworkBuffers: 2000
>>>>>> We have tried several things, increasing the heap, reducing the
>>>> heap, more memory fraction, changes this value in the
>>>> taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to
>>>> work.
>>>>>> Thanks for your help.
>>>>>> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>>>> On 2017-11-08 15:20, Piotr Nowojski wrote:
>>>>>>>> Hi Ebru and Javier,
>>>>>>>> Yes, if you could share this example job it would be helpful.
>>>>>>>> Ebru: could you explain in a little more details how does
>>>> your Job(s)
>>>>>>>> look like? Could you post some code? If you are just using
>>>> maps and
>>>>>>>> filters there shouldn’t be any network transfers involved,
>>>> aside
>>>>>>>> from Source and Sink functions.
>>>>>>>> Piotrek
>>>>>>>>> On 8 Nov 2017, at 12:54, ebru
>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>>>>>> Hi Javier,
>>>>>>>>> It would be helpful if you share your test job with us.
>>>>>>>>> Which configurations did you try?
>>>>>>>>> -Ebru
>>>>>>>>> On 8 Nov 2017, at 14:43, Javier Lopez
>>>> <javier.lo...@zalando.de>
>>>>>>>>> wrote:
>>>>>>>>> Hi,
>>>>>>>>> We have been facing a similar problem. We have tried some
>>>> different
>>>>>>>>> configurations, as proposed in other email thread by Flavio
>>>> and
>>>>>>>>> Kien, but it didn't work. We have a workaround similar to
>>>> the one
>>>>>>>>> that Flavio has, we restart the taskmanagers once they reach
>>>> a
>>>>>>>>> memory threshold. We created a small test to remove all of
>>>> our
>>>>>>>>> dependencies and leave only flink native libraries. This
>>>> test reads
>>>>>>>>> data from a Kafka topic and writes it back to another topic
>>>> in
>>>>>>>>> Kafka. We cancel the job and start another every 5 seconds.
>>>> After
>>>>>>>>> ~30 minutes of doing this process, the cluster reaches the
>>>> OS memory
>>>>>>>>> limit and dies.
>>>>>>>>> Currently, we have a test cluster with 8 workers and 8 task
>>>> slots
>>>>>>>>> per node. We have one job that uses 56 slots, and we cannot
>>>> execute
>>>>>>>>> that job 5 times in a row because the whole cluster dies. If
>>>> you
>>>>>>>>> want, we can publish our test job.
>>>>>>>>> Regards,
>>>>>>>>> On 8 November 2017 at 11:20, Aljoscha Krettek
>>>> <aljos...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>> @Nico & @Piotr Could you please have a look at this? You
>>>> both
>>>>>>>>> recently worked on the network stack and might be most
>>>> familiar with
>>>>>>>>> this.
>>>>>>>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier
>>>> <pomperma...@okkam.it>
>>>>>>>>> wrote:
>>>>>>>>> We also have the same problem in production. At the moment
>>>> the
>>>>>>>>> solution is to restart the entire Flink cluster after every
>>>> job..
>>>>>>>>> We've tried to reproduce this problem with a test (see
>>>>>>>>> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we
>>>> don't
>>>>>>>>> know whether the error produced by the test and the leak are
>>>>>>>>> correlated..
>>>>>>>>> Best,
>>>>>>>>> Flavio
>>>>>>>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA
>>>> EBRU
>>>>>>>>&g

Re: Flink memory leak

2017-11-09 Thread ÇETİNKAYA EBRU ÇETİNKAYA EBRU
ttek

<aljos...@apache.org>

wrote:

@Nico & @Piotr Could you please have a look at this? You

both

recently worked on the network stack and might be most

familiar with

this.

On 8. Nov 2017, at 10:25, Flavio Pompermaier

<pomperma...@okkam.it>

wrote:

We also have the same problem in production. At the moment

the

solution is to restart the entire Flink cluster after every

job..

We've tried to reproduce this problem with a test (see
https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we

don't

know whether the error produced by the test and the leak are
correlated..

Best,
Flavio

On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA

EBRU

<b20926...@cs.hacettepe.edu.tr> wrote:
On 2017-11-07 16:53, Ufuk Celebi wrote:
Do you use any windowing? If yes, could you please share

that code?

If
there is no stateful operation at all, it's strange where

the list

state instances are coming from.

On Tue, Nov 7, 2017 at 2:35 PM, ebru

<b20926...@cs.hacettepe.edu.tr>

wrote:
Hi Ufuk,

We don’t explicitly define any state descriptor. We only

use map

and filters
operator. We thought that gc handle clearing the flink’s

internal

states.
So how can we manage the memory if it is always increasing?

- Ebru

On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org> wrote:

Hey Ebru, the memory usage might be increasing as long as a

job is

running.
This is expected (also in the case of multiple running

jobs). The

screenshots are not helpful in that regard. :-(

What kind of stateful operations are you using? Depending on

your

use case,
you have to manually call `clear()` on the state instance in

order

to
release the managed state.

Best,

Ufuk

On Tue, Nov 7, 2017 at 12:43 PM, ebru
<b20926...@cs.hacettepe.edu.tr> wrote:

Begin forwarded message:

From: ebru <b20926...@cs.hacettepe.edu.tr>
Subject: Re: Flink memory leak
Date: 7 November 2017 at 14:09:17 GMT+3
To: Ufuk Celebi <u...@apache.org>

Hi Ufuk,

There are there snapshots of htop output.
1. snapshot is initial state.
2. snapshot is after submitted one job.
3. Snapshot is the output of the one job with 15000 EPS. And

the

memory
usage is always increasing over time.

<1.png><2.png><3.png>

On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org> wrote:

Hey Ebru,

let me pull in Aljoscha (CC'd) who might have an idea what's

causing

this.

Since multiple jobs are running, it will be hard to

understand to

which job the state descriptors from the heap snapshot

belong to.

- Is it possible to isolate the problem and reproduce the

behaviour

with only a single job?

– Ufuk

On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU

ÇETİNKAYA EBRU

<b20926...@cs.hacettepe.edu.tr> wrote:

Hi,

We are using Flink 1.3.1 in production, we have one job

manager and

3 task
managers in standalone mode. Recently, we've noticed that we

have

memory
related problems. We use docker container to serve Flink

cluster. We

have
300 slots and 20 jobs are running with parallelism of 10.

Also the

job
count
may be change over time. Taskmanager memory usage always

increases.

After
job cancelation this memory usage doesn't decrease. We've

tried to

investigate the problem and we've got the task manager jvm

heap

snapshot.
According to the jam heap analysis, possible memory leak was

Flink

list
state descriptor. But we are not sure that is the cause of

our

memory
problem. How can we solve the problem?

We have two types of Flink job. One has no state full

operator

contains only maps and filters and the other has time window

with

count trigger.


* We've analysed the jvm heaps again in different

conditions. First

we analysed the snapshot when no flink jobs running on

cluster. (image

1)
* Then, we analysed the jvm heap snapshot when the flink job

that has

no state full operator is running. And according to the

results, leak

suspect was NetworkBufferPool (image 2)
*   Last analys, there were both two types of jobs running

and leak

suspect was again NetworkBufferPool. (image 3)
In our system jobs are regularly cancelled and resubmitted so

we

noticed that when job is submitted some amount of memory

allocated and

after cancelation this allocated memory never freed. So over

time

memory usage is always increasing and exceeded the limits.







Links:
--
[1] https://issues.apache.org/jira/browse/FLINK-7845


Hi Piotr,

There are two types of jobs.
In first, we use Kafka source and Kafka sink, there isn't any

window operator.

In second job, we use Kafka source, filesystem sink and

elastic search sink and window operator for buffering.








Hi Piotrek,

Thanks for your reply.

We've tested our link cluster again. We have 360 slots, and our cluster 
configuration is like this;


jobmanager.rpc.address: %JOBMANAGER%
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 1536
taskmanager.heap.mb: 1536
taskmanager.numberOfTaskSlots: 120
taskmanager.memory.preallocate: fals

Re: Flink memory leak

2017-11-08 Thread Piotr Nowojski
>>>
>> >>>> Hi Ebru and Javier,
>> >>>>
>> >>>> Yes, if you could share this example job it would be helpful.
>> >>>>
>> >>>> Ebru: could you explain in a little more details how does your Job(s)
>> >>>> look like? Could you post some code? If you are just using maps and
>> >>>> filters there shouldn’t be any network transfers involved, aside
>> >>>> from Source and Sink functions.
>> >>>>
>> >>>> Piotrek
>> >>>>
>> >>>>> On 8 Nov 2017, at 12:54, ebru <b20926...@cs.hacettepe.edu.tr 
>> >>>>> <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
>> >>>>>
>> >>>>> Hi Javier,
>> >>>>>
>> >>>>> It would be helpful if you share your test job with us.
>> >>>>> Which configurations did you try?
>> >>>>>
>> >>>>> -Ebru
>> >>>>>
>> >>>>> On 8 Nov 2017, at 14:43, Javier Lopez <javier.lo...@zalando.de 
>> >>>>> <mailto:javier.lo...@zalando.de>>
>> >>>>> wrote:
>> >>>>>
>> >>>>> Hi,
>> >>>>>
>> >>>>> We have been facing a similar problem. We have tried some different
>> >>>>> configurations, as proposed in other email thread by Flavio and
>> >>>>> Kien, but it didn't work. We have a workaround similar to the one
>> >>>>> that Flavio has, we restart the taskmanagers once they reach a
>> >>>>> memory threshold. We created a small test to remove all of our
>> >>>>> dependencies and leave only flink native libraries. This test reads
>> >>>>> data from a Kafka topic and writes it back to another topic in
>> >>>>> Kafka. We cancel the job and start another every 5 seconds. After
>> >>>>> ~30 minutes of doing this process, the cluster reaches the OS memory
>> >>>>> limit and dies.
>> >>>>>
>> >>>>> Currently, we have a test cluster with 8 workers and 8 task slots
>> >>>>> per node. We have one job that uses 56 slots, and we cannot execute
>> >>>>> that job 5 times in a row because the whole cluster dies. If you
>> >>>>> want, we can publish our test job.
>> >>>>>
>> >>>>> Regards,
>> >>>>>
>> >>>>> On 8 November 2017 at 11:20, Aljoscha Krettek <aljos...@apache.org 
>> >>>>> <mailto:aljos...@apache.org>>
>> >>>>> wrote:
>> >>>>>
>> >>>>> @Nico & @Piotr Could you please have a look at this? You both
>> >>>>> recently worked on the network stack and might be most familiar with
>> >>>>> this.
>> >>>>>
>> >>>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier <pomperma...@okkam.it 
>> >>>>> <mailto:pomperma...@okkam.it>>
>> >>>>> wrote:
>> >>>>>
>> >>>>> We also have the same problem in production. At the moment the
>> >>>>> solution is to restart the entire Flink cluster after every job..
>> >>>>> We've tried to reproduce this problem with a test (see
>> >>>>> https://issues.apache.org/jira/browse/FLINK-7845 
>> >>>>> <https://issues.apache.org/jira/browse/FLINK-7845> [1]) but we don't
>> >>>>> know whether the error produced by the test and the leak are
>> >>>>> correlated..
>> >>>>>
>> >>>>> Best,
>> >>>>> Flavio
>> >>>>>
>> >>>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>> >>>>> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> 
>> >>>>> wrote:
>> >>>>> On 2017-11-07 16:53, Ufuk Celebi wrote:
>> >>>>> Do you use any windowing? If yes, could you please share that code?
>> >>>>> If
>> >>>>> there is no stateful operation at all, it's strange where the list
>> >>>>> state instances are coming from.
>> >>>>>
>> >>>>> On Tue, Nov 7, 2017 at 2:35 PM, ebr

Re: Flink memory leak

2017-11-08 Thread Piotr Nowojski
 >>>>> On 8 Nov 2017, at 14:43, Javier Lopez <javier.lo...@zalando.de 
> >>>>> <mailto:javier.lo...@zalando.de>>
> >>>>> wrote:
> >>>>>
> >>>>> Hi,
> >>>>>
> >>>>> We have been facing a similar problem. We have tried some different
> >>>>> configurations, as proposed in other email thread by Flavio and
> >>>>> Kien, but it didn't work. We have a workaround similar to the one
> >>>>> that Flavio has, we restart the taskmanagers once they reach a
> >>>>> memory threshold. We created a small test to remove all of our
> >>>>> dependencies and leave only flink native libraries. This test reads
> >>>>> data from a Kafka topic and writes it back to another topic in
> >>>>> Kafka. We cancel the job and start another every 5 seconds. After
> >>>>> ~30 minutes of doing this process, the cluster reaches the OS memory
> >>>>> limit and dies.
> >>>>>
> >>>>> Currently, we have a test cluster with 8 workers and 8 task slots
> >>>>> per node. We have one job that uses 56 slots, and we cannot execute
> >>>>> that job 5 times in a row because the whole cluster dies. If you
> >>>>> want, we can publish our test job.
> >>>>>
> >>>>> Regards,
> >>>>>
> >>>>> On 8 November 2017 at 11:20, Aljoscha Krettek <aljos...@apache.org 
> >>>>> <mailto:aljos...@apache.org>>
> >>>>> wrote:
> >>>>>
> >>>>> @Nico & @Piotr Could you please have a look at this? You both
> >>>>> recently worked on the network stack and might be most familiar with
> >>>>> this.
> >>>>>
> >>>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier <pomperma...@okkam.it 
> >>>>> <mailto:pomperma...@okkam.it>>
> >>>>> wrote:
> >>>>>
> >>>>> We also have the same problem in production. At the moment the
> >>>>> solution is to restart the entire Flink cluster after every job..
> >>>>> We've tried to reproduce this problem with a test (see
> >>>>> https://issues.apache.org/jira/browse/FLINK-7845 
> >>>>> <https://issues.apache.org/jira/browse/FLINK-7845> [1]) but we don't
> >>>>> know whether the error produced by the test and the leak are
> >>>>> correlated..
> >>>>>
> >>>>> Best,
> >>>>> Flavio
> >>>>>
> >>>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
> >>>>> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> 
> >>>>> wrote:
> >>>>> On 2017-11-07 16:53, Ufuk Celebi wrote:
> >>>>> Do you use any windowing? If yes, could you please share that code?
> >>>>> If
> >>>>> there is no stateful operation at all, it's strange where the list
> >>>>> state instances are coming from.
> >>>>>
> >>>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru <b20926...@cs.hacettepe.edu.tr 
> >>>>> <mailto:b20926...@cs.hacettepe.edu.tr>>
> >>>>> wrote:
> >>>>> Hi Ufuk,
> >>>>>
> >>>>> We don’t explicitly define any state descriptor. We only use map
> >>>>> and filters
> >>>>> operator. We thought that gc handle clearing the flink’s internal
> >>>>> states.
> >>>>> So how can we manage the memory if it is always increasing?
> >>>>>
> >>>>> - Ebru
> >>>>>
> >>>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org 
> >>>>> <mailto:u...@apache.org>> wrote:
> >>>>>
> >>>>> Hey Ebru, the memory usage might be increasing as long as a job is
> >>>>> running.
> >>>>> This is expected (also in the case of multiple running jobs). The
> >>>>> screenshots are not helpful in that regard. :-(
> >>>>>
> >>>>> What kind of stateful operations are you using? Depending on your
> >>>>> use case,
> >>>>> you have to manually call `clear()` on the state instance in order
> >>>>> to
> &

Re: Flink memory leak

2017-11-08 Thread Javier Lopez
m in production. At the moment the
>>>>> solution is to restart the entire Flink cluster after every job..
>>>>> We've tried to reproduce this problem with a test (see
>>>>> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we don't
>>>>> know whether the error produced by the test and the leak are
>>>>> correlated..
>>>>>
>>>>> Best,
>>>>> Flavio
>>>>>
>>>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>> On 2017-11-07 16:53, Ufuk Celebi wrote:
>>>>> Do you use any windowing? If yes, could you please share that code?
>>>>> If
>>>>> there is no stateful operation at all, it's strange where the list
>>>>> state instances are coming from.
>>>>>
>>>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru <b20926...@cs.hacettepe.edu.tr>
>>>>> wrote:
>>>>> Hi Ufuk,
>>>>>
>>>>> We don’t explicitly define any state descriptor. We only use map
>>>>> and filters
>>>>> operator. We thought that gc handle clearing the flink’s internal
>>>>> states.
>>>>> So how can we manage the memory if it is always increasing?
>>>>>
>>>>> - Ebru
>>>>>
>>>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org> wrote:
>>>>>
>>>>> Hey Ebru, the memory usage might be increasing as long as a job is
>>>>> running.
>>>>> This is expected (also in the case of multiple running jobs). The
>>>>> screenshots are not helpful in that regard. :-(
>>>>>
>>>>> What kind of stateful operations are you using? Depending on your
>>>>> use case,
>>>>> you have to manually call `clear()` on the state instance in order
>>>>> to
>>>>> release the managed state.
>>>>>
>>>>> Best,
>>>>>
>>>>> Ufuk
>>>>>
>>>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru
>>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>>
>>>>> Begin forwarded message:
>>>>>
>>>>> From: ebru <b20926...@cs.hacettepe.edu.tr>
>>>>> Subject: Re: Flink memory leak
>>>>> Date: 7 November 2017 at 14:09:17 GMT+3
>>>>> To: Ufuk Celebi <u...@apache.org>
>>>>>
>>>>> Hi Ufuk,
>>>>>
>>>>> There are there snapshots of htop output.
>>>>> 1. snapshot is initial state.
>>>>> 2. snapshot is after submitted one job.
>>>>> 3. Snapshot is the output of the one job with 15000 EPS. And the
>>>>> memory
>>>>> usage is always increasing over time.
>>>>>
>>>>> <1.png><2.png><3.png>
>>>>>
>>>>> On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org> wrote:
>>>>>
>>>>> Hey Ebru,
>>>>>
>>>>> let me pull in Aljoscha (CC'd) who might have an idea what's causing
>>>>> this.
>>>>>
>>>>> Since multiple jobs are running, it will be hard to understand to
>>>>> which job the state descriptors from the heap snapshot belong to.
>>>>> - Is it possible to isolate the problem and reproduce the behaviour
>>>>> with only a single job?
>>>>>
>>>>> – Ufuk
>>>>>
>>>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> We are using Flink 1.3.1 in production, we have one job manager and
>>>>> 3 task
>>>>> managers in standalone mode. Recently, we've noticed that we have
>>>>> memory
>>>>> related problems. We use docker container to serve Flink cluster. We
>>>>> have
>>>>> 300 slots and 20 jobs are running with parallelism of 10. Also the
>>>>> job
>>>>> count
>>>>> may be change over time. Taskmanager memory usage always increases.
>>>>> After
>>>>> job cancelation this memory usage doesn't decrease. We've tried to
>>>>> investigate the problem and we've got the task manager jvm heap
>>>>> snapshot.
>>>>> According to the jam heap analysis, possible memory leak was Flink
>>>>> list
>>>>> state descriptor. But we are not sure that is the cause of our
>>>>> memory
>>>>> problem. How can we solve the problem?
>>>>>
>>>>> We have two types of Flink job. One has no state full operator
>>>>> contains only maps and filters and the other has time window with
>>>>> count trigger.
>>>>
>>>>  * We've analysed the jvm heaps again in different conditions. First
>>>> we analysed the snapshot when no flink jobs running on cluster. (image
>>>> 1)
>>>> * Then, we analysed the jvm heap snapshot when the flink job that has
>>>> no state full operator is running. And according to the results, leak
>>>> suspect was NetworkBufferPool (image 2)
>>>> *   Last analys, there were both two types of jobs running and leak
>>>> suspect was again NetworkBufferPool. (image 3)
>>>> In our system jobs are regularly cancelled and resubmitted so we
>>>> noticed that when job is submitted some amount of memory allocated and
>>>> after cancelation this allocated memory never freed. So over time
>>>> memory usage is always increasing and exceeded the limits.
>>>>
>>>>>>
>>>>
>>>>
>>>>
>>>> Links:
>>>> --
>>>> [1] https://issues.apache.org/jira/browse/FLINK-7845
>>>
>>> Hi Piotr,
>>>
>>> There are two types of jobs.
>>> In first, we use Kafka source and Kafka sink, there isn't any window
operator.
>>> In second job, we use Kafka source, filesystem sink and elastic search
sink and window operator for buffering.
>>
>>
>
>
>


Re: Flink memory leak

2017-11-08 Thread Piotr Nowojski
t; https://issues.apache.org/jira/browse/FLINK-7845 
>> <https://issues.apache.org/jira/browse/FLINK-7845> [1]) but we don't
>> 
>> know whether the error produced by the test and the leak are
>> correlated..
>> 
>> Best,
>> Flavio
>> 
>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
>> On 2017-11-07 16:53, Ufuk Celebi wrote:
>> Do you use any windowing? If yes, could you please share that code?
>> If
>> there is no stateful operation at all, it's strange where the list
>> state instances are coming from.
>> 
>> On Tue, Nov 7, 2017 at 2:35 PM, ebru <b20926...@cs.hacettepe.edu.tr 
>> <mailto:b20926...@cs.hacettepe.edu.tr>>
>> wrote:
>> Hi Ufuk,
>> 
>> We don’t explicitly define any state descriptor. We only use map
>> and filters
>> operator. We thought that gc handle clearing the flink’s internal
>> states.
>> So how can we manage the memory if it is always increasing?
>> 
>> - Ebru
>> 
>> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org 
>> <mailto:u...@apache.org>> wrote:
>> 
>> Hey Ebru, the memory usage might be increasing as long as a job is
>> running.
>> This is expected (also in the case of multiple running jobs). The
>> screenshots are not helpful in that regard. :-(
>> 
>> What kind of stateful operations are you using? Depending on your
>> use case,
>> you have to manually call `clear()` on the state instance in order
>> to
>> release the managed state.
>> 
>> Best,
>> 
>> Ufuk
>> 
>> On Tue, Nov 7, 2017 at 12:43 PM, ebru
>> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
>> 
>> Begin forwarded message:
>> 
>> From: ebru <b20926...@cs.hacettepe.edu.tr 
>> <mailto:b20926...@cs.hacettepe.edu.tr>>
>> Subject: Re: Flink memory leak
>> Date: 7 November 2017 at 14:09:17 GMT+3
>> To: Ufuk Celebi <u...@apache.org <mailto:u...@apache.org>>
>> 
>> Hi Ufuk,
>> 
>> There are there snapshots of htop output.
>> 1. snapshot is initial state.
>> 2. snapshot is after submitted one job.
>> 3. Snapshot is the output of the one job with 15000 EPS. And the
>> memory
>> usage is always increasing over time.
>> 
>> <1.png><2.png><3.png>
>> 
>> On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org 
>> <mailto:u...@apache.org>> wrote:
>> 
>> Hey Ebru,
>> 
>> let me pull in Aljoscha (CC'd) who might have an idea what's causing
>> this.
>> 
>> Since multiple jobs are running, it will be hard to understand to
>> which job the state descriptors from the heap snapshot belong to.
>> - Is it possible to isolate the problem and reproduce the behaviour
>> with only a single job?
>> 
>> – Ufuk
>> 
>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
>> 
>> Hi,
>> 
>> We are using Flink 1.3.1 in production, we have one job manager and
>> 3 task
>> managers in standalone mode. Recently, we've noticed that we have
>> memory
>> related problems. We use docker container to serve Flink cluster. We
>> have
>> 300 slots and 20 jobs are running with parallelism of 10. Also the
>> job
>> count
>> may be change over time. Taskmanager memory usage always increases.
>> After
>> job cancelation this memory usage doesn't decrease. We've tried to
>> investigate the problem and we've got the task manager jvm heap
>> snapshot.
>> According to the jam heap analysis, possible memory leak was Flink
>> list
>> state descriptor. But we are not sure that is the cause of our
>> memory
>> problem. How can we solve the problem?
>> 
>> We have two types of Flink job. One has no state full operator
>> contains only maps and filters and the other has time window with
>> count trigger.
>>  * We've analysed the jvm heaps again in different conditions. First
>> we analysed the snapshot when no flink jobs running on cluster. (image
>> 1)
>> * Then, we analysed the jvm heap snapshot when the flink job that has
>> no state full operator is running. And according to the results, leak
>> suspect was NetworkBufferPool (image 2)
>> *   Last analys, there were both two types of jobs running and leak
>> suspect was again NetworkBufferPool. (image 3)
>> In our system jobs are regularly cancelled and resubmitted so we
>> noticed that when job is submitted some amount of memory allocated and
>> after cancelation this allocated memory never freed. So over time
>> memory usage is always increasing and exceeded the limits.
>> 
>> 
>> 
>> 
>> 
>> Links:
>> --
>> [1] https://issues.apache.org/jira/browse/FLINK-7845 
>> <https://issues.apache.org/jira/browse/FLINK-7845>
>> Hi Piotr,
>> 
>> There are two types of jobs.
>> In first, we use Kafka source and Kafka sink, there isn't any window 
>> operator.
>> In second job, we use Kafka source, filesystem sink and elastic search sink 
>> and window operator for buffering.
>> 
> 
> 



Re: Flink memory leak

2017-11-08 Thread Javier Lopez
t;>>> Do you use any windowing? If yes, could you please share that code?
>>>> If
>>>> there is no stateful operation at all, it's strange where the list
>>>> state instances are coming from.
>>>>
>>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru <b20926...@cs.hacettepe.edu.tr>
>>>> wrote:
>>>> Hi Ufuk,
>>>>
>>>> We don’t explicitly define any state descriptor. We only use map
>>>> and filters
>>>> operator. We thought that gc handle clearing the flink’s internal
>>>> states.
>>>> So how can we manage the memory if it is always increasing?
>>>>
>>>> - Ebru
>>>>
>>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org> wrote:
>>>>
>>>> Hey Ebru, the memory usage might be increasing as long as a job is
>>>> running.
>>>> This is expected (also in the case of multiple running jobs). The
>>>> screenshots are not helpful in that regard. :-(
>>>>
>>>> What kind of stateful operations are you using? Depending on your
>>>> use case,
>>>> you have to manually call `clear()` on the state instance in order
>>>> to
>>>> release the managed state.
>>>>
>>>> Best,
>>>>
>>>> Ufuk
>>>>
>>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru
>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>
>>>> Begin forwarded message:
>>>>
>>>> From: ebru <b20926...@cs.hacettepe.edu.tr>
>>>> Subject: Re: Flink memory leak
>>>> Date: 7 November 2017 at 14:09:17 GMT+3
>>>> To: Ufuk Celebi <u...@apache.org>
>>>>
>>>> Hi Ufuk,
>>>>
>>>> There are there snapshots of htop output.
>>>> 1. snapshot is initial state.
>>>> 2. snapshot is after submitted one job.
>>>> 3. Snapshot is the output of the one job with 15000 EPS. And the
>>>> memory
>>>> usage is always increasing over time.
>>>>
>>>> <1.png><2.png><3.png>
>>>>
>>>> On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org> wrote:
>>>>
>>>> Hey Ebru,
>>>>
>>>> let me pull in Aljoscha (CC'd) who might have an idea what's causing
>>>> this.
>>>>
>>>> Since multiple jobs are running, it will be hard to understand to
>>>> which job the state descriptors from the heap snapshot belong to.
>>>> - Is it possible to isolate the problem and reproduce the behaviour
>>>> with only a single job?
>>>>
>>>> – Ufuk
>>>>
>>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>
>>>> Hi,
>>>>
>>>> We are using Flink 1.3.1 in production, we have one job manager and
>>>> 3 task
>>>> managers in standalone mode. Recently, we've noticed that we have
>>>> memory
>>>> related problems. We use docker container to serve Flink cluster. We
>>>> have
>>>> 300 slots and 20 jobs are running with parallelism of 10. Also the
>>>> job
>>>> count
>>>> may be change over time. Taskmanager memory usage always increases.
>>>> After
>>>> job cancelation this memory usage doesn't decrease. We've tried to
>>>> investigate the problem and we've got the task manager jvm heap
>>>> snapshot.
>>>> According to the jam heap analysis, possible memory leak was Flink
>>>> list
>>>> state descriptor. But we are not sure that is the cause of our
>>>> memory
>>>> problem. How can we solve the problem?
>>>>
>>>> We have two types of Flink job. One has no state full operator
>>>> contains only maps and filters and the other has time window with
>>>> count trigger.
>>>>
>>>  * We've analysed the jvm heaps again in different conditions. First
>>> we analysed the snapshot when no flink jobs running on cluster. (image
>>> 1)
>>> * Then, we analysed the jvm heap snapshot when the flink job that has
>>> no state full operator is running. And according to the results, leak
>>> suspect was NetworkBufferPool (image 2)
>>> *   Last analys, there were both two types of jobs running and leak
>>> suspect was again NetworkBufferPool. (image 3)
>>> In our system jobs are regularly cancelled and resubmitted so we
>>> noticed that when job is submitted some amount of memory allocated and
>>> after cancelation this allocated memory never freed. So over time
>>> memory usage is always increasing and exceeded the limits.
>>>
>>>
>>>>>
>>>
>>>
>>> Links:
>>> --
>>> [1] https://issues.apache.org/jira/browse/FLINK-7845
>>>
>> Hi Piotr,
>>
>> There are two types of jobs.
>> In first, we use Kafka source and Kafka sink, there isn't any window
>> operator.
>> In second job, we use Kafka source, filesystem sink and elastic search
>> sink and window operator for buffering.
>>
>
>
>


Re: Flink memory leak

2017-11-08 Thread Piotr Nowojski
t; 
> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org 
> <mailto:u...@apache.org>> wrote:
> 
> Hey Ebru, the memory usage might be increasing as long as a job is
> running.
> This is expected (also in the case of multiple running jobs). The
> screenshots are not helpful in that regard. :-(
> 
> What kind of stateful operations are you using? Depending on your
> use case,
> you have to manually call `clear()` on the state instance in order
> to
> release the managed state.
> 
> Best,
> 
> Ufuk
> 
> On Tue, Nov 7, 2017 at 12:43 PM, ebru
> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
> 
> Begin forwarded message:
> 
> From: ebru <b20926...@cs.hacettepe.edu.tr 
> <mailto:b20926...@cs.hacettepe.edu.tr>>
> Subject: Re: Flink memory leak
> Date: 7 November 2017 at 14:09:17 GMT+3
> To: Ufuk Celebi <u...@apache.org <mailto:u...@apache.org>>
> 
> Hi Ufuk,
> 
> There are there snapshots of htop output.
> 1. snapshot is initial state.
> 2. snapshot is after submitted one job.
> 3. Snapshot is the output of the one job with 15000 EPS. And the
> memory
> usage is always increasing over time.
> 
> <1.png><2.png><3.png>
> 
> On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org 
> <mailto:u...@apache.org>> wrote:
> 
> Hey Ebru,
> 
> let me pull in Aljoscha (CC'd) who might have an idea what's causing
> this.
> 
> Since multiple jobs are running, it will be hard to understand to
> which job the state descriptors from the heap snapshot belong to.
> - Is it possible to isolate the problem and reproduce the behaviour
> with only a single job?
> 
> – Ufuk
> 
> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
> 
> Hi,
> 
> We are using Flink 1.3.1 in production, we have one job manager and
> 3 task
> managers in standalone mode. Recently, we've noticed that we have
> memory
> related problems. We use docker container to serve Flink cluster. We
> have
> 300 slots and 20 jobs are running with parallelism of 10. Also the
> job
> count
> may be change over time. Taskmanager memory usage always increases.
> After
> job cancelation this memory usage doesn't decrease. We've tried to
> investigate the problem and we've got the task manager jvm heap
> snapshot.
> According to the jam heap analysis, possible memory leak was Flink
> list
> state descriptor. But we are not sure that is the cause of our
> memory
> problem. How can we solve the problem?
> 
> We have two types of Flink job. One has no state full operator
> contains only maps and filters and the other has time window with
> count trigger.
>  * We've analysed the jvm heaps again in different conditions. First
> we analysed the snapshot when no flink jobs running on cluster. (image
> 1)
> * Then, we analysed the jvm heap snapshot when the flink job that has
> no state full operator is running. And according to the results, leak
> suspect was NetworkBufferPool (image 2)
> *   Last analys, there were both two types of jobs running and leak
> suspect was again NetworkBufferPool. (image 3)
> In our system jobs are regularly cancelled and resubmitted so we
> noticed that when job is submitted some amount of memory allocated and
> after cancelation this allocated memory never freed. So over time
> memory usage is always increasing and exceeded the limits.
> 
> 
> 
> 
> 
> Links:
> --
> [1] https://issues.apache.org/jira/browse/FLINK-7845 
> <https://issues.apache.org/jira/browse/FLINK-7845>
> Hi Piotr,
> 
> There are two types of jobs.
> In first, we use Kafka source and Kafka sink, there isn't any window operator.
> In second job, we use Kafka source, filesystem sink and elastic search sink 
> and window operator for buffering.
> 



Re: Flink memory leak

2017-11-08 Thread Javier Lopez
Hi,

This is the test flink job we created to trigger this leak
https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6
And this is the python script we are using to execute the job thousands of
times to get the OOM problem
https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107

The cluster we used for this has this configuration:

   - Instance type: t2.large
   - Number of workers: 2
   - HeapMemory: 5500
   - Number of task slots per node: 4
   - TaskMangMemFraction: 0.5
   - NumberOfNetworkBuffers: 2000

We have tried several things, increasing the heap, reducing the heap, more
memory fraction, changes this value in the taskmanager.sh
"TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to work.

Thanks for your help.

On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <
b20926...@cs.hacettepe.edu.tr> wrote:

> On 2017-11-08 15:20, Piotr Nowojski wrote:
>
>> Hi Ebru and Javier,
>>
>> Yes, if you could share this example job it would be helpful.
>>
>> Ebru: could you explain in a little more details how does your Job(s)
>> look like? Could you post some code? If you are just using maps and
>> filters there shouldn’t be any network transfers involved, aside
>> from Source and Sink functions.
>>
>> Piotrek
>>
>> On 8 Nov 2017, at 12:54, ebru <b20926...@cs.hacettepe.edu.tr> wrote:
>>>
>>> Hi Javier,
>>>
>>> It would be helpful if you share your test job with us.
>>> Which configurations did you try?
>>>
>>> -Ebru
>>>
>>> On 8 Nov 2017, at 14:43, Javier Lopez <javier.lo...@zalando.de>
>>> wrote:
>>>
>>> Hi,
>>>
>>> We have been facing a similar problem. We have tried some different
>>> configurations, as proposed in other email thread by Flavio and
>>> Kien, but it didn't work. We have a workaround similar to the one
>>> that Flavio has, we restart the taskmanagers once they reach a
>>> memory threshold. We created a small test to remove all of our
>>> dependencies and leave only flink native libraries. This test reads
>>> data from a Kafka topic and writes it back to another topic in
>>> Kafka. We cancel the job and start another every 5 seconds. After
>>> ~30 minutes of doing this process, the cluster reaches the OS memory
>>> limit and dies.
>>>
>>> Currently, we have a test cluster with 8 workers and 8 task slots
>>> per node. We have one job that uses 56 slots, and we cannot execute
>>> that job 5 times in a row because the whole cluster dies. If you
>>> want, we can publish our test job.
>>>
>>> Regards,
>>>
>>> On 8 November 2017 at 11:20, Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>>
>>> @Nico & @Piotr Could you please have a look at this? You both
>>> recently worked on the network stack and might be most familiar with
>>> this.
>>>
>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier <pomperma...@okkam.it>
>>> wrote:
>>>
>>> We also have the same problem in production. At the moment the
>>> solution is to restart the entire Flink cluster after every job..
>>> We've tried to reproduce this problem with a test (see
>>> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we don't
>>>
>>> know whether the error produced by the test and the leak are
>>> correlated..
>>>
>>> Best,
>>> Flavio
>>>
>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>> On 2017-11-07 16:53, Ufuk Celebi wrote:
>>> Do you use any windowing? If yes, could you please share that code?
>>> If
>>> there is no stateful operation at all, it's strange where the list
>>> state instances are coming from.
>>>
>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru <b20926...@cs.hacettepe.edu.tr>
>>> wrote:
>>> Hi Ufuk,
>>>
>>> We don’t explicitly define any state descriptor. We only use map
>>> and filters
>>> operator. We thought that gc handle clearing the flink’s internal
>>> states.
>>> So how can we manage the memory if it is always increasing?
>>>
>>> - Ebru
>>>
>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org> wrote:
>>>
>>> Hey Ebru, the memory usage might be increasing as long as a job is
>>> running.
>>> This is expected (also in the case of multiple running jobs). The
>>> screenshots are not helpful in th

Re: Flink memory leak

2017-11-08 Thread ÇETİNKAYA EBRU ÇETİNKAYA EBRU

On 2017-11-08 15:20, Piotr Nowojski wrote:

Hi Ebru and Javier,

Yes, if you could share this example job it would be helpful.

Ebru: could you explain in a little more details how does your Job(s)
look like? Could you post some code? If you are just using maps and
filters there shouldn’t be any network transfers involved, aside
from Source and Sink functions.

Piotrek


On 8 Nov 2017, at 12:54, ebru <b20926...@cs.hacettepe.edu.tr> wrote:

Hi Javier,

It would be helpful if you share your test job with us.
Which configurations did you try?

-Ebru

On 8 Nov 2017, at 14:43, Javier Lopez <javier.lo...@zalando.de>
wrote:

Hi,

We have been facing a similar problem. We have tried some different
configurations, as proposed in other email thread by Flavio and
Kien, but it didn't work. We have a workaround similar to the one
that Flavio has, we restart the taskmanagers once they reach a
memory threshold. We created a small test to remove all of our
dependencies and leave only flink native libraries. This test reads
data from a Kafka topic and writes it back to another topic in
Kafka. We cancel the job and start another every 5 seconds. After
~30 minutes of doing this process, the cluster reaches the OS memory
limit and dies.

Currently, we have a test cluster with 8 workers and 8 task slots
per node. We have one job that uses 56 slots, and we cannot execute
that job 5 times in a row because the whole cluster dies. If you
want, we can publish our test job.

Regards,

On 8 November 2017 at 11:20, Aljoscha Krettek <aljos...@apache.org>
wrote:

@Nico & @Piotr Could you please have a look at this? You both
recently worked on the network stack and might be most familiar with
this.

On 8. Nov 2017, at 10:25, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

We also have the same problem in production. At the moment the
solution is to restart the entire Flink cluster after every job..
We've tried to reproduce this problem with a test (see
https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we don't
know whether the error produced by the test and the leak are
correlated..

Best,
Flavio

On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
<b20926...@cs.hacettepe.edu.tr> wrote:
On 2017-11-07 16:53, Ufuk Celebi wrote:
Do you use any windowing? If yes, could you please share that code?
If
there is no stateful operation at all, it's strange where the list
state instances are coming from.

On Tue, Nov 7, 2017 at 2:35 PM, ebru <b20926...@cs.hacettepe.edu.tr>
wrote:
Hi Ufuk,

We don’t explicitly define any state descriptor. We only use map
and filters
operator. We thought that gc handle clearing the flink’s internal
states.
So how can we manage the memory if it is always increasing?

- Ebru

On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org> wrote:

Hey Ebru, the memory usage might be increasing as long as a job is
running.
This is expected (also in the case of multiple running jobs). The
screenshots are not helpful in that regard. :-(

What kind of stateful operations are you using? Depending on your
use case,
you have to manually call `clear()` on the state instance in order
to
release the managed state.

Best,

Ufuk

On Tue, Nov 7, 2017 at 12:43 PM, ebru
<b20926...@cs.hacettepe.edu.tr> wrote:

Begin forwarded message:

From: ebru <b20926...@cs.hacettepe.edu.tr>
Subject: Re: Flink memory leak
Date: 7 November 2017 at 14:09:17 GMT+3
To: Ufuk Celebi <u...@apache.org>

Hi Ufuk,

There are there snapshots of htop output.
1. snapshot is initial state.
2. snapshot is after submitted one job.
3. Snapshot is the output of the one job with 15000 EPS. And the
memory
usage is always increasing over time.

<1.png><2.png><3.png>

On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org> wrote:

Hey Ebru,

let me pull in Aljoscha (CC'd) who might have an idea what's causing
this.

Since multiple jobs are running, it will be hard to understand to
which job the state descriptors from the heap snapshot belong to.
- Is it possible to isolate the problem and reproduce the behaviour
with only a single job?

– Ufuk

On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
<b20926...@cs.hacettepe.edu.tr> wrote:

Hi,

We are using Flink 1.3.1 in production, we have one job manager and
3 task
managers in standalone mode. Recently, we've noticed that we have
memory
related problems. We use docker container to serve Flink cluster. We
have
300 slots and 20 jobs are running with parallelism of 10. Also the
job
count
may be change over time. Taskmanager memory usage always increases.
After
job cancelation this memory usage doesn't decrease. We've tried to
investigate the problem and we've got the task manager jvm heap
snapshot.
According to the jam heap analysis, possible memory leak was Flink
list
state descriptor. But we are not sure that is the cause of our
memory
problem. How can we solve the problem?

We have two types of Flink jo

Re: Flink memory leak

2017-11-08 Thread Piotr Nowojski
Hi Ebru and Javier,

Yes, if you could share this example job it would be helpful.

Ebru: could you explain in a little more details how does your Job(s) look 
like? Could you post some code? If you are just using maps and filters there 
shouldn’t be any network transfers involved, aside from Source and Sink 
functions.

Piotrek

> On 8 Nov 2017, at 12:54, ebru <b20926...@cs.hacettepe.edu.tr> wrote:
> 
> Hi Javier,
> 
> It would be helpful if you share your test job with us. 
> Which configurations did you try?
> 
> -Ebru
>> On 8 Nov 2017, at 14:43, Javier Lopez <javier.lo...@zalando.de 
>> <mailto:javier.lo...@zalando.de>> wrote:
>> 
>> Hi,
>> 
>> We have been facing a similar problem. We have tried some different 
>> configurations, as proposed in other email thread by Flavio and Kien, but it 
>> didn't work. We have a workaround similar to the one that Flavio has, we 
>> restart the taskmanagers once they reach a memory threshold. We created a 
>> small test to remove all of our dependencies and leave only flink native 
>> libraries. This test reads data from a Kafka topic and writes it back to 
>> another topic in Kafka. We cancel the job and start another every 5 seconds. 
>> After ~30 minutes of doing this process, the cluster reaches the OS memory 
>> limit and dies. 
>> 
>> Currently, we have a test cluster with 8 workers and 8 task slots per node. 
>> We have one job that uses 56 slots, and we cannot execute that job 5 times 
>> in a row because the whole cluster dies. If you want, we can publish our 
>> test job.
>> 
>> Regards,
>> 
>> On 8 November 2017 at 11:20, Aljoscha Krettek <aljos...@apache.org 
>> <mailto:aljos...@apache.org>> wrote:
>> @Nico & @Piotr Could you please have a look at this? You both recently 
>> worked on the network stack and might be most familiar with this.
>> 
>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier <pomperma...@okkam.it 
>>> <mailto:pomperma...@okkam.it>> wrote:
>>> 
>>> We also have the same problem in production. At the moment the solution is 
>>> to restart the entire Flink cluster after every job..
>>> We've tried to reproduce this problem with a test (see 
>>> https://issues.apache.org/jira/browse/FLINK-7845 
>>> <https://issues.apache.org/jira/browse/FLINK-7845>) but we don't know 
>>> whether the error produced by the test and the leak are correlated..
>>> 
>>> Best,
>>> Flavio
>>> 
>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>>> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> 
>>> wrote:
>>> On 2017-11-07 16:53, Ufuk Celebi wrote:
>>> Do you use any windowing? If yes, could you please share that code? If
>>> there is no stateful operation at all, it's strange where the list
>>> state instances are coming from.
>>> 
>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru <b20926...@cs.hacettepe.edu.tr 
>>> <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
>>> Hi Ufuk,
>>> 
>>> We don’t explicitly define any state descriptor. We only use map and filters
>>> operator. We thought that gc handle clearing the flink’s internal states.
>>> So how can we manage the memory if it is always increasing?
>>> 
>>> - Ebru
>>> 
>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org 
>>> <mailto:u...@apache.org>> wrote:
>>> 
>>> Hey Ebru, the memory usage might be increasing as long as a job is running.
>>> This is expected (also in the case of multiple running jobs). The
>>> screenshots are not helpful in that regard. :-(
>>> 
>>> What kind of stateful operations are you using? Depending on your use case,
>>> you have to manually call `clear()` on the state instance in order to
>>> release the managed state.
>>> 
>>> Best,
>>> 
>>> Ufuk
>>> 
>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru <b20926...@cs.hacettepe.edu.tr 
>>> <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
>>> 
>>> 
>>> 
>>> Begin forwarded message:
>>> 
>>> From: ebru <b20926...@cs.hacettepe.edu.tr 
>>> <mailto:b20926...@cs.hacettepe.edu.tr>>
>>> Subject: Re: Flink memory leak
>>> Date: 7 November 2017 at 14:09:17 GMT+3
>>> To: Ufuk Celebi <u...@apache.org <mailto:u...@apache.org>>
>>> 
>>> Hi Ufuk,
>>> 
>>> There a

Re: Flink memory leak

2017-11-08 Thread ebru
Hi Javier,

It would be helpful if you share your test job with us. 
Which configurations did you try?

-Ebru
> On 8 Nov 2017, at 14:43, Javier Lopez <javier.lo...@zalando.de> wrote:
> 
> Hi,
> 
> We have been facing a similar problem. We have tried some different 
> configurations, as proposed in other email thread by Flavio and Kien, but it 
> didn't work. We have a workaround similar to the one that Flavio has, we 
> restart the taskmanagers once they reach a memory threshold. We created a 
> small test to remove all of our dependencies and leave only flink native 
> libraries. This test reads data from a Kafka topic and writes it back to 
> another topic in Kafka. We cancel the job and start another every 5 seconds. 
> After ~30 minutes of doing this process, the cluster reaches the OS memory 
> limit and dies. 
> 
> Currently, we have a test cluster with 8 workers and 8 task slots per node. 
> We have one job that uses 56 slots, and we cannot execute that job 5 times in 
> a row because the whole cluster dies. If you want, we can publish our test 
> job.
> 
> Regards,
> 
> On 8 November 2017 at 11:20, Aljoscha Krettek <aljos...@apache.org 
> <mailto:aljos...@apache.org>> wrote:
> @Nico & @Piotr Could you please have a look at this? You both recently worked 
> on the network stack and might be most familiar with this.
> 
>> On 8. Nov 2017, at 10:25, Flavio Pompermaier <pomperma...@okkam.it 
>> <mailto:pomperma...@okkam.it>> wrote:
>> 
>> We also have the same problem in production. At the moment the solution is 
>> to restart the entire Flink cluster after every job..
>> We've tried to reproduce this problem with a test (see 
>> https://issues.apache.org/jira/browse/FLINK-7845 
>> <https://issues.apache.org/jira/browse/FLINK-7845>) but we don't know 
>> whether the error produced by the test and the leak are correlated..
>> 
>> Best,
>> Flavio
>> 
>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
>> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
>> On 2017-11-07 16:53, Ufuk Celebi wrote:
>> Do you use any windowing? If yes, could you please share that code? If
>> there is no stateful operation at all, it's strange where the list
>> state instances are coming from.
>> 
>> On Tue, Nov 7, 2017 at 2:35 PM, ebru <b20926...@cs.hacettepe.edu.tr 
>> <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
>> Hi Ufuk,
>> 
>> We don’t explicitly define any state descriptor. We only use map and filters
>> operator. We thought that gc handle clearing the flink’s internal states.
>> So how can we manage the memory if it is always increasing?
>> 
>> - Ebru
>> 
>> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org 
>> <mailto:u...@apache.org>> wrote:
>> 
>> Hey Ebru, the memory usage might be increasing as long as a job is running.
>> This is expected (also in the case of multiple running jobs). The
>> screenshots are not helpful in that regard. :-(
>> 
>> What kind of stateful operations are you using? Depending on your use case,
>> you have to manually call `clear()` on the state instance in order to
>> release the managed state.
>> 
>> Best,
>> 
>> Ufuk
>> 
>> On Tue, Nov 7, 2017 at 12:43 PM, ebru <b20926...@cs.hacettepe.edu.tr 
>> <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
>> 
>> 
>> 
>> Begin forwarded message:
>> 
>> From: ebru <b20926...@cs.hacettepe.edu.tr 
>> <mailto:b20926...@cs.hacettepe.edu.tr>>
>> Subject: Re: Flink memory leak
>> Date: 7 November 2017 at 14:09:17 GMT+3
>> To: Ufuk Celebi <u...@apache.org <mailto:u...@apache.org>>
>> 
>> Hi Ufuk,
>> 
>> There are there snapshots of htop output.
>> 1. snapshot is initial state.
>> 2. snapshot is after submitted one job.
>> 3. Snapshot is the output of the one job with 15000 EPS. And the memory
>> usage is always increasing over time.
>> 
>> 
>> 
>> 
>> <1.png><2.png><3.png>
>> 
>> On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org 
>> <mailto:u...@apache.org>> wrote:
>> 
>> Hey Ebru,
>> 
>> let me pull in Aljoscha (CC'd) who might have an idea what's causing this.
>> 
>> Since multiple jobs are running, it will be hard to understand to
>> which job the state descriptors from the heap snapshot belong to.
>> - Is it possible to isolate the problem and reproduce the behaviour
>> with only a single job?
>>

Re: Flink memory leak

2017-11-08 Thread Javier Lopez
Hi,

We have been facing a similar problem. We have tried some different
configurations, as proposed in other email thread by Flavio and Kien, but
it didn't work. We have a workaround similar to the one that Flavio has, we
restart the taskmanagers once they reach a memory threshold. We created a
small test to remove all of our dependencies and leave only flink native
libraries. This test reads data from a Kafka topic and writes it back to
another topic in Kafka. We cancel the job and start another every 5
seconds. After ~30 minutes of doing this process, the cluster reaches the
OS memory limit and dies.

Currently, we have a test cluster with 8 workers and 8 task slots per node.
We have one job that uses 56 slots, and we cannot execute that job 5 times
in a row because the whole cluster dies. If you want, we can publish our
test job.

Regards,

On 8 November 2017 at 11:20, Aljoscha Krettek <aljos...@apache.org> wrote:

> @Nico & @Piotr Could you please have a look at this? You both recently
> worked on the network stack and might be most familiar with this.
>
> On 8. Nov 2017, at 10:25, Flavio Pompermaier <pomperma...@okkam.it> wrote:
>
> We also have the same problem in production. At the moment the solution is
> to restart the entire Flink cluster after every job..
> We've tried to reproduce this problem with a test (see
> https://issues.apache.org/jira/browse/FLINK-7845) but we don't know
> whether the error produced by the test and the leak are correlated..
>
> Best,
> Flavio
>
> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <
> b20926...@cs.hacettepe.edu.tr> wrote:
>
>> On 2017-11-07 16:53, Ufuk Celebi wrote:
>>
>>> Do you use any windowing? If yes, could you please share that code? If
>>> there is no stateful operation at all, it's strange where the list
>>> state instances are coming from.
>>>
>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru <b20926...@cs.hacettepe.edu.tr>
>>> wrote:
>>>
>>>> Hi Ufuk,
>>>>
>>>> We don’t explicitly define any state descriptor. We only use map and
>>>> filters
>>>> operator. We thought that gc handle clearing the flink’s internal
>>>> states.
>>>> So how can we manage the memory if it is always increasing?
>>>>
>>>> - Ebru
>>>>
>>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org> wrote:
>>>>
>>>> Hey Ebru, the memory usage might be increasing as long as a job is
>>>> running.
>>>> This is expected (also in the case of multiple running jobs). The
>>>> screenshots are not helpful in that regard. :-(
>>>>
>>>> What kind of stateful operations are you using? Depending on your use
>>>> case,
>>>> you have to manually call `clear()` on the state instance in order to
>>>> release the managed state.
>>>>
>>>> Best,
>>>>
>>>> Ufuk
>>>>
>>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru <b20926...@cs.hacettepe.edu.tr>
>>>> wrote:
>>>>
>>>>>
>>>>>
>>>>>
>>>>> Begin forwarded message:
>>>>>
>>>>> From: ebru <b20926...@cs.hacettepe.edu.tr>
>>>>> Subject: Re: Flink memory leak
>>>>> Date: 7 November 2017 at 14:09:17 GMT+3
>>>>> To: Ufuk Celebi <u...@apache.org>
>>>>>
>>>>> Hi Ufuk,
>>>>>
>>>>> There are there snapshots of htop output.
>>>>> 1. snapshot is initial state.
>>>>> 2. snapshot is after submitted one job.
>>>>> 3. Snapshot is the output of the one job with 15000 EPS. And the memory
>>>>> usage is always increasing over time.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> <1.png><2.png><3.png>
>>>>>
>>>>> On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org> wrote:
>>>>>
>>>>> Hey Ebru,
>>>>>
>>>>> let me pull in Aljoscha (CC'd) who might have an idea what's causing
>>>>> this.
>>>>>
>>>>> Since multiple jobs are running, it will be hard to understand to
>>>>> which job the state descriptors from the heap snapshot belong to.
>>>>> - Is it possible to isolate the problem and reproduce the behaviour
>>>>> with only a single job?
>>>>>
>>>>> – Ufuk
>>>>>
>>>>>
>>>>> On 

Re: Flink memory leak

2017-11-08 Thread Aljoscha Krettek
@Nico & @Piotr Could you please have a look at this? You both recently worked 
on the network stack and might be most familiar with this.

> On 8. Nov 2017, at 10:25, Flavio Pompermaier <pomperma...@okkam.it> wrote:
> 
> We also have the same problem in production. At the moment the solution is to 
> restart the entire Flink cluster after every job..
> We've tried to reproduce this problem with a test (see 
> https://issues.apache.org/jira/browse/FLINK-7845 
> <https://issues.apache.org/jira/browse/FLINK-7845>) but we don't know whether 
> the error produced by the test and the leak are correlated..
> 
> Best,
> Flavio
> 
> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
> On 2017-11-07 16:53, Ufuk Celebi wrote:
> Do you use any windowing? If yes, could you please share that code? If
> there is no stateful operation at all, it's strange where the list
> state instances are coming from.
> 
> On Tue, Nov 7, 2017 at 2:35 PM, ebru <b20926...@cs.hacettepe.edu.tr 
> <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
> Hi Ufuk,
> 
> We don’t explicitly define any state descriptor. We only use map and filters
> operator. We thought that gc handle clearing the flink’s internal states.
> So how can we manage the memory if it is always increasing?
> 
> - Ebru
> 
> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org 
> <mailto:u...@apache.org>> wrote:
> 
> Hey Ebru, the memory usage might be increasing as long as a job is running.
> This is expected (also in the case of multiple running jobs). The
> screenshots are not helpful in that regard. :-(
> 
> What kind of stateful operations are you using? Depending on your use case,
> you have to manually call `clear()` on the state instance in order to
> release the managed state.
> 
> Best,
> 
> Ufuk
> 
> On Tue, Nov 7, 2017 at 12:43 PM, ebru <b20926...@cs.hacettepe.edu.tr 
> <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
> 
> 
> 
> Begin forwarded message:
> 
> From: ebru <b20926...@cs.hacettepe.edu.tr 
> <mailto:b20926...@cs.hacettepe.edu.tr>>
> Subject: Re: Flink memory leak
> Date: 7 November 2017 at 14:09:17 GMT+3
> To: Ufuk Celebi <u...@apache.org <mailto:u...@apache.org>>
> 
> Hi Ufuk,
> 
> There are there snapshots of htop output.
> 1. snapshot is initial state.
> 2. snapshot is after submitted one job.
> 3. Snapshot is the output of the one job with 15000 EPS. And the memory
> usage is always increasing over time.
> 
> 
> 
> 
> <1.png><2.png><3.png>
> 
> On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org 
> <mailto:u...@apache.org>> wrote:
> 
> Hey Ebru,
> 
> let me pull in Aljoscha (CC'd) who might have an idea what's causing this.
> 
> Since multiple jobs are running, it will be hard to understand to
> which job the state descriptors from the heap snapshot belong to.
> - Is it possible to isolate the problem and reproduce the behaviour
> with only a single job?
> 
> – Ufuk
> 
> 
> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
> 
> Hi,
> 
> We are using Flink 1.3.1 in production, we have one job manager and 3 task
> managers in standalone mode. Recently, we've noticed that we have memory
> related problems. We use docker container to serve Flink cluster. We have
> 300 slots and 20 jobs are running with parallelism of 10. Also the job
> count
> may be change over time. Taskmanager memory usage always increases. After
> job cancelation this memory usage doesn't decrease. We've tried to
> investigate the problem and we've got the task manager jvm heap snapshot.
> According to the jam heap analysis, possible memory leak was Flink list
> state descriptor. But we are not sure that is the cause of our memory
> problem. How can we solve the problem?
> 
> 
> 
> We have two types of Flink job. One has no state full operator contains only 
> maps and filters and the other has time window with count trigger.
> * We've analysed the jvm heaps again in different conditions. First we 
> analysed the snapshot when no flink jobs running on cluster. (image 1)
> * Then, we analysed the jvm heap snapshot when the flink job that has no 
> state full operator is running. And according to the results, leak suspect 
> was NetworkBufferPool (image 2)
> *   Last analys, there were both two types of jobs running and leak suspect 
> was again NetworkBufferPool. (image 3)
> In our system jobs are regularly cancelled and resubmitted so we noticed that 
> when job is submitted some amount of memory allocated and after cancelation 
> this allocated memory never freed. So over time memory usage is always 
> increasing and exceeded the limits.
> 
> 



Re: Flink memory leak

2017-11-08 Thread Flavio Pompermaier
We also have the same problem in production. At the moment the solution is
to restart the entire Flink cluster after every job..
We've tried to reproduce this problem with a test (see
https://issues.apache.org/jira/browse/FLINK-7845) but we don't know whether
the error produced by the test and the leak are correlated..

Best,
Flavio

On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <
b20926...@cs.hacettepe.edu.tr> wrote:

> On 2017-11-07 16:53, Ufuk Celebi wrote:
>
>> Do you use any windowing? If yes, could you please share that code? If
>> there is no stateful operation at all, it's strange where the list
>> state instances are coming from.
>>
>> On Tue, Nov 7, 2017 at 2:35 PM, ebru <b20926...@cs.hacettepe.edu.tr>
>> wrote:
>>
>>> Hi Ufuk,
>>>
>>> We don’t explicitly define any state descriptor. We only use map and
>>> filters
>>> operator. We thought that gc handle clearing the flink’s internal states.
>>> So how can we manage the memory if it is always increasing?
>>>
>>> - Ebru
>>>
>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org> wrote:
>>>
>>> Hey Ebru, the memory usage might be increasing as long as a job is
>>> running.
>>> This is expected (also in the case of multiple running jobs). The
>>> screenshots are not helpful in that regard. :-(
>>>
>>> What kind of stateful operations are you using? Depending on your use
>>> case,
>>> you have to manually call `clear()` on the state instance in order to
>>> release the managed state.
>>>
>>> Best,
>>>
>>> Ufuk
>>>
>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru <b20926...@cs.hacettepe.edu.tr>
>>> wrote:
>>>
>>>>
>>>>
>>>>
>>>> Begin forwarded message:
>>>>
>>>> From: ebru <b20926...@cs.hacettepe.edu.tr>
>>>> Subject: Re: Flink memory leak
>>>> Date: 7 November 2017 at 14:09:17 GMT+3
>>>> To: Ufuk Celebi <u...@apache.org>
>>>>
>>>> Hi Ufuk,
>>>>
>>>> There are there snapshots of htop output.
>>>> 1. snapshot is initial state.
>>>> 2. snapshot is after submitted one job.
>>>> 3. Snapshot is the output of the one job with 15000 EPS. And the memory
>>>> usage is always increasing over time.
>>>>
>>>>
>>>>
>>>>
>>>> <1.png><2.png><3.png>
>>>>
>>>> On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org> wrote:
>>>>
>>>> Hey Ebru,
>>>>
>>>> let me pull in Aljoscha (CC'd) who might have an idea what's causing
>>>> this.
>>>>
>>>> Since multiple jobs are running, it will be hard to understand to
>>>> which job the state descriptors from the heap snapshot belong to.
>>>> - Is it possible to isolate the problem and reproduce the behaviour
>>>> with only a single job?
>>>>
>>>> – Ufuk
>>>>
>>>>
>>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>>>
>>>> Hi,
>>>>
>>>> We are using Flink 1.3.1 in production, we have one job manager and 3
>>>> task
>>>> managers in standalone mode. Recently, we've noticed that we have memory
>>>> related problems. We use docker container to serve Flink cluster. We
>>>> have
>>>> 300 slots and 20 jobs are running with parallelism of 10. Also the job
>>>> count
>>>> may be change over time. Taskmanager memory usage always increases.
>>>> After
>>>> job cancelation this memory usage doesn't decrease. We've tried to
>>>> investigate the problem and we've got the task manager jvm heap
>>>> snapshot.
>>>> According to the jam heap analysis, possible memory leak was Flink list
>>>> state descriptor. But we are not sure that is the cause of our memory
>>>> problem. How can we solve the problem?
>>>>
>>>>
>>>>
>>>> We have two types of Flink job. One has no state full operator contains
>>> only maps and filters and the other has time window with count trigger.
>>>
>> * We've analysed the jvm heaps again in different conditions. First we
> analysed the snapshot when no flink jobs running on cluster. (image 1)
> * Then, we analysed the jvm heap snapshot when the flink job that has no
> state full operator is running. And according to the results, leak suspect
> was NetworkBufferPool (image 2)
> *   Last analys, there were both two types of jobs running and leak
> suspect was again NetworkBufferPool. (image 3)
> In our system jobs are regularly cancelled and resubmitted so we noticed
> that when job is submitted some amount of memory allocated and after
> cancelation this allocated memory never freed. So over time memory usage is
> always increasing and exceeded the limits.
>
>>


Re: Flink memory leak

2017-11-07 Thread Aljoscha Krettek
I agree with Ufuk, it would be helpful to know what stateful operations are in 
the jobs (including windowing).

> On 7. Nov 2017, at 14:53, Ufuk Celebi <u...@apache.org> wrote:
> 
> Do you use any windowing? If yes, could you please share that code? If
> there is no stateful operation at all, it's strange where the list
> state instances are coming from.
> 
> On Tue, Nov 7, 2017 at 2:35 PM, ebru <b20926...@cs.hacettepe.edu.tr> wrote:
>> Hi Ufuk,
>> 
>> We don’t explicitly define any state descriptor. We only use map and filters
>> operator. We thought that gc handle clearing the flink’s internal states.
>> So how can we manage the memory if it is always increasing?
>> 
>> - Ebru
>> 
>> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org> wrote:
>> 
>> Hey Ebru, the memory usage might be increasing as long as a job is running.
>> This is expected (also in the case of multiple running jobs). The
>> screenshots are not helpful in that regard. :-(
>> 
>> What kind of stateful operations are you using? Depending on your use case,
>> you have to manually call `clear()` on the state instance in order to
>> release the managed state.
>> 
>> Best,
>> 
>> Ufuk
>> 
>> On Tue, Nov 7, 2017 at 12:43 PM, ebru <b20926...@cs.hacettepe.edu.tr> wrote:
>>> 
>>> 
>>> 
>>> Begin forwarded message:
>>> 
>>> From: ebru <b20926...@cs.hacettepe.edu.tr>
>>> Subject: Re: Flink memory leak
>>> Date: 7 November 2017 at 14:09:17 GMT+3
>>> To: Ufuk Celebi <u...@apache.org>
>>> 
>>> Hi Ufuk,
>>> 
>>> There are there snapshots of htop output.
>>> 1. snapshot is initial state.
>>> 2. snapshot is after submitted one job.
>>> 3. Snapshot is the output of the one job with 15000 EPS. And the memory
>>> usage is always increasing over time.
>>> 
>>> 
>>> 
>>> 
>>> <1.png><2.png><3.png>
>>> 
>>> On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org> wrote:
>>> 
>>> Hey Ebru,
>>> 
>>> let me pull in Aljoscha (CC'd) who might have an idea what's causing this.
>>> 
>>> Since multiple jobs are running, it will be hard to understand to
>>> which job the state descriptors from the heap snapshot belong to.
>>> - Is it possible to isolate the problem and reproduce the behaviour
>>> with only a single job?
>>> 
>>> – Ufuk
>>> 
>>> 
>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>> 
>>> Hi,
>>> 
>>> We are using Flink 1.3.1 in production, we have one job manager and 3 task
>>> managers in standalone mode. Recently, we've noticed that we have memory
>>> related problems. We use docker container to serve Flink cluster. We have
>>> 300 slots and 20 jobs are running with parallelism of 10. Also the job
>>> count
>>> may be change over time. Taskmanager memory usage always increases. After
>>> job cancelation this memory usage doesn't decrease. We've tried to
>>> investigate the problem and we've got the task manager jvm heap snapshot.
>>> According to the jam heap analysis, possible memory leak was Flink list
>>> state descriptor. But we are not sure that is the cause of our memory
>>> problem. How can we solve the problem?
>>> 
>>> 
>>> 
>> 
>> 



Re: Flink memory leak

2017-11-07 Thread Ufuk Celebi
Do you use any windowing? If yes, could you please share that code? If
there is no stateful operation at all, it's strange where the list
state instances are coming from.

On Tue, Nov 7, 2017 at 2:35 PM, ebru <b20926...@cs.hacettepe.edu.tr> wrote:
> Hi Ufuk,
>
> We don’t explicitly define any state descriptor. We only use map and filters
> operator. We thought that gc handle clearing the flink’s internal states.
> So how can we manage the memory if it is always increasing?
>
> - Ebru
>
> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org> wrote:
>
> Hey Ebru, the memory usage might be increasing as long as a job is running.
> This is expected (also in the case of multiple running jobs). The
> screenshots are not helpful in that regard. :-(
>
> What kind of stateful operations are you using? Depending on your use case,
> you have to manually call `clear()` on the state instance in order to
> release the managed state.
>
> Best,
>
> Ufuk
>
> On Tue, Nov 7, 2017 at 12:43 PM, ebru <b20926...@cs.hacettepe.edu.tr> wrote:
>>
>>
>>
>> Begin forwarded message:
>>
>> From: ebru <b20926...@cs.hacettepe.edu.tr>
>> Subject: Re: Flink memory leak
>> Date: 7 November 2017 at 14:09:17 GMT+3
>> To: Ufuk Celebi <u...@apache.org>
>>
>> Hi Ufuk,
>>
>> There are there snapshots of htop output.
>> 1. snapshot is initial state.
>> 2. snapshot is after submitted one job.
>> 3. Snapshot is the output of the one job with 15000 EPS. And the memory
>> usage is always increasing over time.
>>
>>
>>
>>
>> <1.png><2.png><3.png>
>>
>> On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org> wrote:
>>
>> Hey Ebru,
>>
>> let me pull in Aljoscha (CC'd) who might have an idea what's causing this.
>>
>> Since multiple jobs are running, it will be hard to understand to
>> which job the state descriptors from the heap snapshot belong to.
>> - Is it possible to isolate the problem and reproduce the behaviour
>> with only a single job?
>>
>> – Ufuk
>>
>>
>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>
>> Hi,
>>
>> We are using Flink 1.3.1 in production, we have one job manager and 3 task
>> managers in standalone mode. Recently, we've noticed that we have memory
>> related problems. We use docker container to serve Flink cluster. We have
>> 300 slots and 20 jobs are running with parallelism of 10. Also the job
>> count
>> may be change over time. Taskmanager memory usage always increases. After
>> job cancelation this memory usage doesn't decrease. We've tried to
>> investigate the problem and we've got the task manager jvm heap snapshot.
>> According to the jam heap analysis, possible memory leak was Flink list
>> state descriptor. But we are not sure that is the cause of our memory
>> problem. How can we solve the problem?
>>
>>
>>
>
>


Re: Flink memory leak

2017-11-07 Thread ebru
Hi Ufuk,

We don’t explicitly define any state descriptor. We only use map and filters 
operator. We thought that gc handle clearing the flink’s internal states. 
So how can we manage the memory if it is always increasing?

- Ebru
> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org> wrote:
> 
> Hey Ebru, the memory usage might be increasing as long as a job is running. 
> This is expected (also in the case of multiple running jobs). The screenshots 
> are not helpful in that regard. :-(
> 
> What kind of stateful operations are you using? Depending on your use case, 
> you have to manually call `clear()` on the state instance in order to release 
> the managed state.
> 
> Best,
> 
> Ufuk
> 
> On Tue, Nov 7, 2017 at 12:43 PM, ebru <b20926...@cs.hacettepe.edu.tr 
> <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
> 
> 
>> Begin forwarded message:
>> 
>> From: ebru <b20926...@cs.hacettepe.edu.tr 
>> <mailto:b20926...@cs.hacettepe.edu.tr>>
>> Subject: Re: Flink memory leak
>> Date: 7 November 2017 at 14:09:17 GMT+3
>> To: Ufuk Celebi <u...@apache.org <mailto:u...@apache.org>>
>> 
>> Hi Ufuk,
>> 
>> There are there snapshots of htop output.
>> 1. snapshot is initial state.
>> 2. snapshot is after submitted one job.
>> 3. Snapshot is the output of the one job with 15000 EPS. And the memory 
>> usage is always increasing over time.
>> 
>> 
>> 
>> 
>> <1.png><2.png><3.png>
>>> On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org 
>>> <mailto:u...@apache.org>> wrote:
>>> 
>>> Hey Ebru,
>>> 
>>> let me pull in Aljoscha (CC'd) who might have an idea what's causing this.
>>> 
>>> Since multiple jobs are running, it will be hard to understand to
>>> which job the state descriptors from the heap snapshot belong to.
>>> - Is it possible to isolate the problem and reproduce the behaviour
>>> with only a single job?
>>> 
>>> – Ufuk
>>> 
>>> 
>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> 
>>> wrote:
>>>> Hi,
>>>> 
>>>> We are using Flink 1.3.1 in production, we have one job manager and 3 task
>>>> managers in standalone mode. Recently, we've noticed that we have memory
>>>> related problems. We use docker container to serve Flink cluster. We have
>>>> 300 slots and 20 jobs are running with parallelism of 10. Also the job 
>>>> count
>>>> may be change over time. Taskmanager memory usage always increases. After
>>>> job cancelation this memory usage doesn't decrease. We've tried to
>>>> investigate the problem and we've got the task manager jvm heap snapshot.
>>>> According to the jam heap analysis, possible memory leak was Flink list
>>>> state descriptor. But we are not sure that is the cause of our memory
>>>> problem. How can we solve the problem?
>> 
> 
> 



Re: Flink memory leak

2017-11-07 Thread Ufuk Celebi
Hey Ebru,

let me pull in Aljoscha (CC'd) who might have an idea what's causing this.

Since multiple jobs are running, it will be hard to understand to
which job the state descriptors from the heap snapshot belong to.
- Is it possible to isolate the problem and reproduce the behaviour
with only a single job?

– Ufuk


On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
 wrote:
> Hi,
>
> We are using Flink 1.3.1 in production, we have one job manager and 3 task
> managers in standalone mode. Recently, we've noticed that we have memory
> related problems. We use docker container to serve Flink cluster. We have
> 300 slots and 20 jobs are running with parallelism of 10. Also the job count
> may be change over time. Taskmanager memory usage always increases. After
> job cancelation this memory usage doesn't decrease. We've tried to
> investigate the problem and we've got the task manager jvm heap snapshot.
> According to the jam heap analysis, possible memory leak was Flink list
> state descriptor. But we are not sure that is the cause of our memory
> problem. How can we solve the problem?