I don’t know if this is relevant to this issue, but I was constantly getting 
failures trying to reproduce this leak using your Job, because you were using 
non deterministic getKey function:
@Override
public Integer getKey(Integer event) {
   Random randomGen = new Random((new Date()).getTime());
   return randomGen.nextInt() % 8;
}
And quoting Java doc of KeySelector:

"If invoked multiple times on the same object, the returned key must be the 
same.”

I’m trying to reproduce this issue with following job:

https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3 
<https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3>

Where IntegerSource is just an infinite source, DisardingSink is well just 
discarding incoming data. I’m cancelling the job every 5 seconds and so far 
(after ~15 minutes) my memory consumption is stable, well below maximum java 
heap size.

Piotrek

> On 8 Nov 2017, at 15:28, Javier Lopez <javier.lo...@zalando.de> wrote:
> 
> Yes, I tested with just printing the stream. But it could take a lot of time 
> to fail. 
> 
> On Wednesday, 8 November 2017, Piotr Nowojski <pi...@data-artisans.com 
> <mailto:pi...@data-artisans.com>> wrote:
> > Thanks for quick answer. 
> > So it will also fail after some time with `fromElements` source instead of 
> > Kafka, right? 
> > Did you try it also without a Kafka producer?
> > Piotrek
> >
> > On 8 Nov 2017, at 14:57, Javier Lopez <javier.lo...@zalando.de 
> > <mailto:javier.lo...@zalando.de>> wrote:
> > Hi,
> > You don't need data. With data it will die faster. I tested as well with a 
> > small data set, using the fromElements source, but it will take some time 
> > to die. It's better with some data.
> > On 8 November 2017 at 14:54, Piotr Nowojski <pi...@data-artisans.com 
> > <mailto:pi...@data-artisans.com>> wrote:
> >>
> >> Hi,
> >> Thanks for sharing this job. 
> >> Do I need to feed some data to the Kafka to reproduce this issue with your 
> >> script?
> >> Does this OOM issue also happen when you are not using the Kafka 
> >> source/sink? 
> >> Piotrek
> >>
> >> On 8 Nov 2017, at 14:08, Javier Lopez <javier.lo...@zalando.de 
> >> <mailto:javier.lo...@zalando.de>> wrote:
> >> Hi,
> >> This is the test flink job we created to trigger this leak 
> >> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 
> >> <https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6>
> >> And this is the python script we are using to execute the job thousands of 
> >> times to get the OOM problem 
> >> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 
> >> <https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107>
> >> The cluster we used for this has this configuration:
> >>
> >> Instance type: t2.large
> >> Number of workers: 2
> >> HeapMemory: 5500
> >> Number of task slots per node: 4
> >> TaskMangMemFraction: 0.5
> >> NumberOfNetworkBuffers: 2000
> >>
> >> We have tried several things, increasing the heap, reducing the heap, more 
> >> memory fraction, changes this value in the taskmanager.sh 
> >> "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to work.
> >> Thanks for your help.
> >> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
> >> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> 
> >> wrote:
> >>>
> >>> On 2017-11-08 15:20, Piotr Nowojski wrote:
> >>>>
> >>>> Hi Ebru and Javier,
> >>>>
> >>>> Yes, if you could share this example job it would be helpful.
> >>>>
> >>>> Ebru: could you explain in a little more details how does your Job(s)
> >>>> look like? Could you post some code? If you are just using maps and
> >>>> filters there shouldn’t be any network transfers involved, aside
> >>>> from Source and Sink functions.
> >>>>
> >>>> Piotrek
> >>>>
> >>>>> On 8 Nov 2017, at 12:54, ebru <b20926...@cs.hacettepe.edu.tr 
> >>>>> <mailto:b20926...@cs.hacettepe.edu.tr>> wrote:
> >>>>>
> >>>>> Hi Javier,
> >>>>>
> >>>>> It would be helpful if you share your test job with us.
> >>>>> Which configurations did you try?
> >>>>>
> >>>>> -Ebru
> >>>>>
> >>>>> On 8 Nov 2017, at 14:43, Javier Lopez <javier.lo...@zalando.de 
> >>>>> <mailto:javier.lo...@zalando.de>>
> >>>>> wrote:
> >>>>>
> >>>>> Hi,
> >>>>>
> >>>>> We have been facing a similar problem. We have tried some different
> >>>>> configurations, as proposed in other email thread by Flavio and
> >>>>> Kien, but it didn't work. We have a workaround similar to the one
> >>>>> that Flavio has, we restart the taskmanagers once they reach a
> >>>>> memory threshold. We created a small test to remove all of our
> >>>>> dependencies and leave only flink native libraries. This test reads
> >>>>> data from a Kafka topic and writes it back to another topic in
> >>>>> Kafka. We cancel the job and start another every 5 seconds. After
> >>>>> ~30 minutes of doing this process, the cluster reaches the OS memory
> >>>>> limit and dies.
> >>>>>
> >>>>> Currently, we have a test cluster with 8 workers and 8 task slots
> >>>>> per node. We have one job that uses 56 slots, and we cannot execute
> >>>>> that job 5 times in a row because the whole cluster dies. If you
> >>>>> want, we can publish our test job.
> >>>>>
> >>>>> Regards,
> >>>>>
> >>>>> On 8 November 2017 at 11:20, Aljoscha Krettek <aljos...@apache.org 
> >>>>> <mailto:aljos...@apache.org>>
> >>>>> wrote:
> >>>>>
> >>>>> @Nico & @Piotr Could you please have a look at this? You both
> >>>>> recently worked on the network stack and might be most familiar with
> >>>>> this.
> >>>>>
> >>>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier <pomperma...@okkam.it 
> >>>>> <mailto:pomperma...@okkam.it>>
> >>>>> wrote:
> >>>>>
> >>>>> We also have the same problem in production. At the moment the
> >>>>> solution is to restart the entire Flink cluster after every job..
> >>>>> We've tried to reproduce this problem with a test (see
> >>>>> https://issues.apache.org/jira/browse/FLINK-7845 
> >>>>> <https://issues.apache.org/jira/browse/FLINK-7845> [1]) but we don't
> >>>>> know whether the error produced by the test and the leak are
> >>>>> correlated..
> >>>>>
> >>>>> Best,
> >>>>> Flavio
> >>>>>
> >>>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
> >>>>> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> 
> >>>>> wrote:
> >>>>> On 2017-11-07 16:53, Ufuk Celebi wrote:
> >>>>> Do you use any windowing? If yes, could you please share that code?
> >>>>> If
> >>>>> there is no stateful operation at all, it's strange where the list
> >>>>> state instances are coming from.
> >>>>>
> >>>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru <b20926...@cs.hacettepe.edu.tr 
> >>>>> <mailto:b20926...@cs.hacettepe.edu.tr>>
> >>>>> wrote:
> >>>>> Hi Ufuk,
> >>>>>
> >>>>> We don’t explicitly define any state descriptor. We only use map
> >>>>> and filters
> >>>>> operator. We thought that gc handle clearing the flink’s internal
> >>>>> states.
> >>>>> So how can we manage the memory if it is always increasing?
> >>>>>
> >>>>> - Ebru
> >>>>>
> >>>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org 
> >>>>> <mailto:u...@apache.org>> wrote:
> >>>>>
> >>>>> Hey Ebru, the memory usage might be increasing as long as a job is
> >>>>> running.
> >>>>> This is expected (also in the case of multiple running jobs). The
> >>>>> screenshots are not helpful in that regard. :-(
> >>>>>
> >>>>> What kind of stateful operations are you using? Depending on your
> >>>>> use case,
> >>>>> you have to manually call `clear()` on the state instance in order
> >>>>> to
> >>>>> release the managed state.
> >>>>>
> >>>>> Best,
> >>>>>
> >>>>> Ufuk
> >>>>>
> >>>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru
> >>>>> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> 
> >>>>> wrote:
> >>>>>
> >>>>> Begin forwarded message:
> >>>>>
> >>>>> From: ebru <b20926...@cs.hacettepe.edu.tr 
> >>>>> <mailto:b20926...@cs.hacettepe.edu.tr>>
> >>>>> Subject: Re: Flink memory leak
> >>>>> Date: 7 November 2017 at 14:09:17 GMT+3
> >>>>> To: Ufuk Celebi <u...@apache.org <mailto:u...@apache.org>>
> >>>>>
> >>>>> Hi Ufuk,
> >>>>>
> >>>>> There are there snapshots of htop output.
> >>>>> 1. snapshot is initial state.
> >>>>> 2. snapshot is after submitted one job.
> >>>>> 3. Snapshot is the output of the one job with 15000 EPS. And the
> >>>>> memory
> >>>>> usage is always increasing over time.
> >>>>>
> >>>>> <1.png><2.png><3.png>
> >>>>>
> >>>>> On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org 
> >>>>> <mailto:u...@apache.org>> wrote:
> >>>>>
> >>>>> Hey Ebru,
> >>>>>
> >>>>> let me pull in Aljoscha (CC'd) who might have an idea what's causing
> >>>>> this.
> >>>>>
> >>>>> Since multiple jobs are running, it will be hard to understand to
> >>>>> which job the state descriptors from the heap snapshot belong to.
> >>>>> - Is it possible to isolate the problem and reproduce the behaviour
> >>>>> with only a single job?
> >>>>>
> >>>>> – Ufuk
> >>>>>
> >>>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
> >>>>> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> 
> >>>>> wrote:
> >>>>>
> >>>>> Hi,
> >>>>>
> >>>>> We are using Flink 1.3.1 in production, we have one job manager and
> >>>>> 3 task
> >>>>> managers in standalone mode. Recently, we've noticed that we have
> >>>>> memory
> >>>>> related problems. We use docker container to serve Flink cluster. We
> >>>>> have
> >>>>> 300 slots and 20 jobs are running with parallelism of 10. Also the
> >>>>> job
> >>>>> count
> >>>>> may be change over time. Taskmanager memory usage always increases.
> >>>>> After
> >>>>> job cancelation this memory usage doesn't decrease. We've tried to
> >>>>> investigate the problem and we've got the task manager jvm heap
> >>>>> snapshot.
> >>>>> According to the jam heap analysis, possible memory leak was Flink
> >>>>> list
> >>>>> state descriptor. But we are not sure that is the cause of our
> >>>>> memory
> >>>>> problem. How can we solve the problem?
> >>>>>
> >>>>> We have two types of Flink job. One has no state full operator
> >>>>> contains only maps and filters and the other has time window with
> >>>>> count trigger.
> >>>>
> >>>>  * We've analysed the jvm heaps again in different conditions. First
> >>>> we analysed the snapshot when no flink jobs running on cluster. (image
> >>>> 1)
> >>>> * Then, we analysed the jvm heap snapshot when the flink job that has
> >>>> no state full operator is running. And according to the results, leak
> >>>> suspect was NetworkBufferPool (image 2)
> >>>> *   Last analys, there were both two types of jobs running and leak
> >>>> suspect was again NetworkBufferPool. (image 3)
> >>>> In our system jobs are regularly cancelled and resubmitted so we
> >>>> noticed that when job is submitted some amount of memory allocated and
> >>>> after cancelation this allocated memory never freed. So over time
> >>>> memory usage is always increasing and exceeded the limits.
> >>>>
> >>>>>>
> >>>>
> >>>>
> >>>>
> >>>> Links:
> >>>> ------
> >>>> [1] https://issues.apache.org/jira/browse/FLINK-7845 
> >>>> <https://issues.apache.org/jira/browse/FLINK-7845>
> >>>
> >>> Hi Piotr,
> >>>
> >>> There are two types of jobs.
> >>> In first, we use Kafka source and Kafka sink, there isn't any window 
> >>> operator.
> >>> In second job, we use Kafka source, filesystem sink and elastic search 
> >>> sink and window operator for buffering.
> >>
> >>
> >
> >
> >

Reply via email to