Hi,

Thanks for sharing this job. 

Do I need to feed some data to the Kafka to reproduce this issue with your 
script?

Does this OOM issue also happen when you are not using the Kafka source/sink? 

Piotrek

> On 8 Nov 2017, at 14:08, Javier Lopez <javier.lo...@zalando.de> wrote:
> 
> Hi,
> 
> This is the test flink job we created to trigger this leak 
> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 
> <https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6>
> And this is the python script we are using to execute the job thousands of 
> times to get the OOM problem 
> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 
> <https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107>
> 
> The cluster we used for this has this configuration:
> Instance type: t2.large
> Number of workers: 2
> HeapMemory: 5500
> Number of task slots per node: 4
> TaskMangMemFraction: 0.5
> NumberOfNetworkBuffers: 2000
> We have tried several things, increasing the heap, reducing the heap, more 
> memory fraction, changes this value in the taskmanager.sh 
> "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to work.
> 
> Thanks for your help.
> 
> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
> On 2017-11-08 15:20, Piotr Nowojski wrote:
> Hi Ebru and Javier,
> 
> Yes, if you could share this example job it would be helpful.
> 
> Ebru: could you explain in a little more details how does your Job(s)
> look like? Could you post some code? If you are just using maps and
> filters there shouldn’t be any network transfers involved, aside
> from Source and Sink functions.
> 
> Piotrek
> 
> On 8 Nov 2017, at 12:54, ebru <b20926...@cs.hacettepe.edu.tr 
> <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
> 
> Hi Javier,
> 
> It would be helpful if you share your test job with us.
> Which configurations did you try?
> 
> -Ebru
> 
> On 8 Nov 2017, at 14:43, Javier Lopez <javier.lo...@zalando.de 
> <mailto:javier.lo...@zalando.de>>
> wrote:
> 
> Hi,
> 
> We have been facing a similar problem. We have tried some different
> configurations, as proposed in other email thread by Flavio and
> Kien, but it didn't work. We have a workaround similar to the one
> that Flavio has, we restart the taskmanagers once they reach a
> memory threshold. We created a small test to remove all of our
> dependencies and leave only flink native libraries. This test reads
> data from a Kafka topic and writes it back to another topic in
> Kafka. We cancel the job and start another every 5 seconds. After
> ~30 minutes of doing this process, the cluster reaches the OS memory
> limit and dies.
> 
> Currently, we have a test cluster with 8 workers and 8 task slots
> per node. We have one job that uses 56 slots, and we cannot execute
> that job 5 times in a row because the whole cluster dies. If you
> want, we can publish our test job.
> 
> Regards,
> 
> On 8 November 2017 at 11:20, Aljoscha Krettek <aljos...@apache.org 
> <mailto:aljos...@apache.org>>
> wrote:
> 
> @Nico & @Piotr Could you please have a look at this? You both
> recently worked on the network stack and might be most familiar with
> this.
> 
> On 8. Nov 2017, at 10:25, Flavio Pompermaier <pomperma...@okkam.it 
> <mailto:pomperma...@okkam.it>>
> wrote:
> 
> We also have the same problem in production. At the moment the
> solution is to restart the entire Flink cluster after every job..
> We've tried to reproduce this problem with a test (see
> https://issues.apache.org/jira/browse/FLINK-7845 
> <https://issues.apache.org/jira/browse/FLINK-7845> [1]) but we don't
> 
> know whether the error produced by the test and the leak are
> correlated..
> 
> Best,
> Flavio
> 
> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
> On 2017-11-07 16:53, Ufuk Celebi wrote:
> Do you use any windowing? If yes, could you please share that code?
> If
> there is no stateful operation at all, it's strange where the list
> state instances are coming from.
> 
> On Tue, Nov 7, 2017 at 2:35 PM, ebru <b20926...@cs.hacettepe.edu.tr 
> <mailto:b20926...@cs.hacettepe.edu.tr>>
> wrote:
> Hi Ufuk,
> 
> We don’t explicitly define any state descriptor. We only use map
> and filters
> operator. We thought that gc handle clearing the flink’s internal
> states.
> So how can we manage the memory if it is always increasing?
> 
> - Ebru
> 
> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org 
> <mailto:u...@apache.org>> wrote:
> 
> Hey Ebru, the memory usage might be increasing as long as a job is
> running.
> This is expected (also in the case of multiple running jobs). The
> screenshots are not helpful in that regard. :-(
> 
> What kind of stateful operations are you using? Depending on your
> use case,
> you have to manually call `clear()` on the state instance in order
> to
> release the managed state.
> 
> Best,
> 
> Ufuk
> 
> On Tue, Nov 7, 2017 at 12:43 PM, ebru
> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
> 
> Begin forwarded message:
> 
> From: ebru <b20926...@cs.hacettepe.edu.tr 
> <mailto:b20926...@cs.hacettepe.edu.tr>>
> Subject: Re: Flink memory leak
> Date: 7 November 2017 at 14:09:17 GMT+3
> To: Ufuk Celebi <u...@apache.org <mailto:u...@apache.org>>
> 
> Hi Ufuk,
> 
> There are there snapshots of htop output.
> 1. snapshot is initial state.
> 2. snapshot is after submitted one job.
> 3. Snapshot is the output of the one job with 15000 EPS. And the
> memory
> usage is always increasing over time.
> 
> <1.png><2.png><3.png>
> 
> On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org 
> <mailto:u...@apache.org>> wrote:
> 
> Hey Ebru,
> 
> let me pull in Aljoscha (CC'd) who might have an idea what's causing
> this.
> 
> Since multiple jobs are running, it will be hard to understand to
> which job the state descriptors from the heap snapshot belong to.
> - Is it possible to isolate the problem and reproduce the behaviour
> with only a single job?
> 
> – Ufuk
> 
> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
> 
> Hi,
> 
> We are using Flink 1.3.1 in production, we have one job manager and
> 3 task
> managers in standalone mode. Recently, we've noticed that we have
> memory
> related problems. We use docker container to serve Flink cluster. We
> have
> 300 slots and 20 jobs are running with parallelism of 10. Also the
> job
> count
> may be change over time. Taskmanager memory usage always increases.
> After
> job cancelation this memory usage doesn't decrease. We've tried to
> investigate the problem and we've got the task manager jvm heap
> snapshot.
> According to the jam heap analysis, possible memory leak was Flink
> list
> state descriptor. But we are not sure that is the cause of our
> memory
> problem. How can we solve the problem?
> 
> We have two types of Flink job. One has no state full operator
> contains only maps and filters and the other has time window with
> count trigger.
>  * We've analysed the jvm heaps again in different conditions. First
> we analysed the snapshot when no flink jobs running on cluster. (image
> 1)
> * Then, we analysed the jvm heap snapshot when the flink job that has
> no state full operator is running. And according to the results, leak
> suspect was NetworkBufferPool (image 2)
> *   Last analys, there were both two types of jobs running and leak
> suspect was again NetworkBufferPool. (image 3)
> In our system jobs are regularly cancelled and resubmitted so we
> noticed that when job is submitted some amount of memory allocated and
> after cancelation this allocated memory never freed. So over time
> memory usage is always increasing and exceeded the limits.
> 
> 
> 
> 
> 
> Links:
> ------
> [1] https://issues.apache.org/jira/browse/FLINK-7845 
> <https://issues.apache.org/jira/browse/FLINK-7845>
> Hi Piotr,
> 
> There are two types of jobs.
> In first, we use Kafka source and Kafka sink, there isn't any window operator.
> In second job, we use Kafka source, filesystem sink and elastic search sink 
> and window operator for buffering.
> 

Reply via email to