I don’t know if this is relevant to this issue, but I was constantly getting failures trying to reproduce this leak using your Job, because you were using non deterministic getKey function: @Override public Integer getKey(Integer event) { Random randomGen = new Random((new Date()).getTime()); return randomGen.nextInt() % 8; } And quoting Java doc of KeySelector:
"If invoked multiple times on the same object, the returned key must be the same.” I’m trying to reproduce this issue with following job: https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3 <https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3> Where IntegerSource is just an infinite source, DisardingSink is well just discarding incoming data. I’m cancelling the job every 5 seconds and so far (after ~15 minutes) my memory consumption is stable, well below maximum java heap size. Piotrek > On 8 Nov 2017, at 15:28, Javier Lopez <javier.lo...@zalando.de> wrote: > > Yes, I tested with just printing the stream. But it could take a lot of time > to fail. > > On Wednesday, 8 November 2017, Piotr Nowojski <pi...@data-artisans.com > <mailto:pi...@data-artisans.com>> wrote: > > Thanks for quick answer. > > So it will also fail after some time with `fromElements` source instead of > > Kafka, right? > > Did you try it also without a Kafka producer? > > Piotrek > > > > On 8 Nov 2017, at 14:57, Javier Lopez <javier.lo...@zalando.de > > <mailto:javier.lo...@zalando.de>> wrote: > > Hi, > > You don't need data. With data it will die faster. I tested as well with a > > small data set, using the fromElements source, but it will take some time > > to die. It's better with some data. > > On 8 November 2017 at 14:54, Piotr Nowojski <pi...@data-artisans.com > > <mailto:pi...@data-artisans.com>> wrote: > >> > >> Hi, > >> Thanks for sharing this job. > >> Do I need to feed some data to the Kafka to reproduce this issue with your > >> script? > >> Does this OOM issue also happen when you are not using the Kafka > >> source/sink? > >> Piotrek > >> > >> On 8 Nov 2017, at 14:08, Javier Lopez <javier.lo...@zalando.de > >> <mailto:javier.lo...@zalando.de>> wrote: > >> Hi, > >> This is the test flink job we created to trigger this leak > >> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6 > >> <https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6> > >> And this is the python script we are using to execute the job thousands of > >> times to get the OOM problem > >> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107 > >> <https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107> > >> The cluster we used for this has this configuration: > >> > >> Instance type: t2.large > >> Number of workers: 2 > >> HeapMemory: 5500 > >> Number of task slots per node: 4 > >> TaskMangMemFraction: 0.5 > >> NumberOfNetworkBuffers: 2000 > >> > >> We have tried several things, increasing the heap, reducing the heap, more > >> memory fraction, changes this value in the taskmanager.sh > >> "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to work. > >> Thanks for your help. > >> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU > >> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> > >> wrote: > >>> > >>> On 2017-11-08 15:20, Piotr Nowojski wrote: > >>>> > >>>> Hi Ebru and Javier, > >>>> > >>>> Yes, if you could share this example job it would be helpful. > >>>> > >>>> Ebru: could you explain in a little more details how does your Job(s) > >>>> look like? Could you post some code? If you are just using maps and > >>>> filters there shouldn’t be any network transfers involved, aside > >>>> from Source and Sink functions. > >>>> > >>>> Piotrek > >>>> > >>>>> On 8 Nov 2017, at 12:54, ebru <b20926...@cs.hacettepe.edu.tr > >>>>> <mailto:b20926...@cs.hacettepe.edu.tr>> wrote: > >>>>> > >>>>> Hi Javier, > >>>>> > >>>>> It would be helpful if you share your test job with us. > >>>>> Which configurations did you try? > >>>>> > >>>>> -Ebru > >>>>> > >>>>> On 8 Nov 2017, at 14:43, Javier Lopez <javier.lo...@zalando.de > >>>>> <mailto:javier.lo...@zalando.de>> > >>>>> wrote: > >>>>> > >>>>> Hi, > >>>>> > >>>>> We have been facing a similar problem. We have tried some different > >>>>> configurations, as proposed in other email thread by Flavio and > >>>>> Kien, but it didn't work. We have a workaround similar to the one > >>>>> that Flavio has, we restart the taskmanagers once they reach a > >>>>> memory threshold. We created a small test to remove all of our > >>>>> dependencies and leave only flink native libraries. This test reads > >>>>> data from a Kafka topic and writes it back to another topic in > >>>>> Kafka. We cancel the job and start another every 5 seconds. After > >>>>> ~30 minutes of doing this process, the cluster reaches the OS memory > >>>>> limit and dies. > >>>>> > >>>>> Currently, we have a test cluster with 8 workers and 8 task slots > >>>>> per node. We have one job that uses 56 slots, and we cannot execute > >>>>> that job 5 times in a row because the whole cluster dies. If you > >>>>> want, we can publish our test job. > >>>>> > >>>>> Regards, > >>>>> > >>>>> On 8 November 2017 at 11:20, Aljoscha Krettek <aljos...@apache.org > >>>>> <mailto:aljos...@apache.org>> > >>>>> wrote: > >>>>> > >>>>> @Nico & @Piotr Could you please have a look at this? You both > >>>>> recently worked on the network stack and might be most familiar with > >>>>> this. > >>>>> > >>>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier <pomperma...@okkam.it > >>>>> <mailto:pomperma...@okkam.it>> > >>>>> wrote: > >>>>> > >>>>> We also have the same problem in production. At the moment the > >>>>> solution is to restart the entire Flink cluster after every job.. > >>>>> We've tried to reproduce this problem with a test (see > >>>>> https://issues.apache.org/jira/browse/FLINK-7845 > >>>>> <https://issues.apache.org/jira/browse/FLINK-7845> [1]) but we don't > >>>>> know whether the error produced by the test and the leak are > >>>>> correlated.. > >>>>> > >>>>> Best, > >>>>> Flavio > >>>>> > >>>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU > >>>>> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> > >>>>> wrote: > >>>>> On 2017-11-07 16:53, Ufuk Celebi wrote: > >>>>> Do you use any windowing? If yes, could you please share that code? > >>>>> If > >>>>> there is no stateful operation at all, it's strange where the list > >>>>> state instances are coming from. > >>>>> > >>>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru <b20926...@cs.hacettepe.edu.tr > >>>>> <mailto:b20926...@cs.hacettepe.edu.tr>> > >>>>> wrote: > >>>>> Hi Ufuk, > >>>>> > >>>>> We don’t explicitly define any state descriptor. We only use map > >>>>> and filters > >>>>> operator. We thought that gc handle clearing the flink’s internal > >>>>> states. > >>>>> So how can we manage the memory if it is always increasing? > >>>>> > >>>>> - Ebru > >>>>> > >>>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org > >>>>> <mailto:u...@apache.org>> wrote: > >>>>> > >>>>> Hey Ebru, the memory usage might be increasing as long as a job is > >>>>> running. > >>>>> This is expected (also in the case of multiple running jobs). The > >>>>> screenshots are not helpful in that regard. :-( > >>>>> > >>>>> What kind of stateful operations are you using? Depending on your > >>>>> use case, > >>>>> you have to manually call `clear()` on the state instance in order > >>>>> to > >>>>> release the managed state. > >>>>> > >>>>> Best, > >>>>> > >>>>> Ufuk > >>>>> > >>>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru > >>>>> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> > >>>>> wrote: > >>>>> > >>>>> Begin forwarded message: > >>>>> > >>>>> From: ebru <b20926...@cs.hacettepe.edu.tr > >>>>> <mailto:b20926...@cs.hacettepe.edu.tr>> > >>>>> Subject: Re: Flink memory leak > >>>>> Date: 7 November 2017 at 14:09:17 GMT+3 > >>>>> To: Ufuk Celebi <u...@apache.org <mailto:u...@apache.org>> > >>>>> > >>>>> Hi Ufuk, > >>>>> > >>>>> There are there snapshots of htop output. > >>>>> 1. snapshot is initial state. > >>>>> 2. snapshot is after submitted one job. > >>>>> 3. Snapshot is the output of the one job with 15000 EPS. And the > >>>>> memory > >>>>> usage is always increasing over time. > >>>>> > >>>>> <1.png><2.png><3.png> > >>>>> > >>>>> On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org > >>>>> <mailto:u...@apache.org>> wrote: > >>>>> > >>>>> Hey Ebru, > >>>>> > >>>>> let me pull in Aljoscha (CC'd) who might have an idea what's causing > >>>>> this. > >>>>> > >>>>> Since multiple jobs are running, it will be hard to understand to > >>>>> which job the state descriptors from the heap snapshot belong to. > >>>>> - Is it possible to isolate the problem and reproduce the behaviour > >>>>> with only a single job? > >>>>> > >>>>> – Ufuk > >>>>> > >>>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU > >>>>> <b20926...@cs.hacettepe.edu.tr <mailto:b20926...@cs.hacettepe.edu.tr>> > >>>>> wrote: > >>>>> > >>>>> Hi, > >>>>> > >>>>> We are using Flink 1.3.1 in production, we have one job manager and > >>>>> 3 task > >>>>> managers in standalone mode. Recently, we've noticed that we have > >>>>> memory > >>>>> related problems. We use docker container to serve Flink cluster. We > >>>>> have > >>>>> 300 slots and 20 jobs are running with parallelism of 10. Also the > >>>>> job > >>>>> count > >>>>> may be change over time. Taskmanager memory usage always increases. > >>>>> After > >>>>> job cancelation this memory usage doesn't decrease. We've tried to > >>>>> investigate the problem and we've got the task manager jvm heap > >>>>> snapshot. > >>>>> According to the jam heap analysis, possible memory leak was Flink > >>>>> list > >>>>> state descriptor. But we are not sure that is the cause of our > >>>>> memory > >>>>> problem. How can we solve the problem? > >>>>> > >>>>> We have two types of Flink job. One has no state full operator > >>>>> contains only maps and filters and the other has time window with > >>>>> count trigger. > >>>> > >>>> * We've analysed the jvm heaps again in different conditions. First > >>>> we analysed the snapshot when no flink jobs running on cluster. (image > >>>> 1) > >>>> * Then, we analysed the jvm heap snapshot when the flink job that has > >>>> no state full operator is running. And according to the results, leak > >>>> suspect was NetworkBufferPool (image 2) > >>>> * Last analys, there were both two types of jobs running and leak > >>>> suspect was again NetworkBufferPool. (image 3) > >>>> In our system jobs are regularly cancelled and resubmitted so we > >>>> noticed that when job is submitted some amount of memory allocated and > >>>> after cancelation this allocated memory never freed. So over time > >>>> memory usage is always increasing and exceeded the limits. > >>>> > >>>>>> > >>>> > >>>> > >>>> > >>>> Links: > >>>> ------ > >>>> [1] https://issues.apache.org/jira/browse/FLINK-7845 > >>>> <https://issues.apache.org/jira/browse/FLINK-7845> > >>> > >>> Hi Piotr, > >>> > >>> There are two types of jobs. > >>> In first, we use Kafka source and Kafka sink, there isn't any window > >>> operator. > >>> In second job, we use Kafka source, filesystem sink and elastic search > >>> sink and window operator for buffering. > >> > >> > > > > > >