Hi,

Thanks for the logs, however I do not see before mentioned exceptions in it. It 
ends with java.lang.InterruptedException

Is it the correct log file? Also, could you attach the std output file of the 
failing TaskManager?

Piotrek

> On 10 Nov 2017, at 08:42, ÇETİNKAYA EBRU ÇETİNKAYA EBRU 
> <b20926...@cs.hacettepe.edu.tr> wrote:
> 
> On 2017-11-09 20:08, Piotr Nowojski wrote:
>> Hi,
>> Could you attach full logs from those task managers? At first glance I
>> don’t see a connection between those exceptions and any memory issue
>> that you might had. It looks like a dependency issue in one (some?
>> All?) of your jobs.
>> Did you build your jars with -Pbuild-jar profile as described here:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project
>> ?
>> If that doesn’t help. Can you binary search which job is causing the
>> problem? There might be some Flink incompatibility between different
>> versions and rebuilding a job’s jar with a version matching to the
>> cluster version might help.
>> Piotrek
>>> On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>> On 2017-11-08 18:30, Piotr Nowojski wrote:
>>> Btw, Ebru:
>>> I don’t agree that the main suspect is NetworkBufferPool. On your
>>> screenshots it’s memory consumption was reasonable and stable:
>>> 596MB
>>> -> 602MB -> 597MB.
>>> PoolThreadCache memory usage ~120MB is also reasonable.
>>> Do you experience any problems, like Out Of Memory
>>> errors/crashes/long
>>> GC pauses? Or just JVM process is using more memory over time? You
>>> are
>>> aware that JVM doesn’t like to release memory back to OS once it
>>> was
>>> used? So increasing memory usage until hitting some limit (for
>>> example
>>> JVM max heap size) is expected behaviour.
>>> Piotrek
>>> On 8 Nov 2017, at 15:48, Piotr Nowojski <pi...@data-artisans.com>
>>> wrote:
>>> I don’t know if this is relevant to this issue, but I was
>>> constantly getting failures trying to reproduce this leak using your
>>> Job, because you were using non deterministic getKey function:
>>> @Override
>>> public Integer getKey(Integer event) {
>>> Random randomGen = new Random((new Date()).getTime());
>>> return randomGen.nextInt() % 8;
>>> }
>>> And quoting Java doc of KeySelector:
>>> "If invoked multiple times on the same object, the returned key must
>>> be the same.”
>>> I’m trying to reproduce this issue with following job:
>>> https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3
>>> Where IntegerSource is just an infinite source, DisardingSink is
>>> well just discarding incoming data. I’m cancelling the job every 5
>>> seconds and so far (after ~15 minutes) my memory consumption is
>>> stable, well below maximum java heap size.
>>> Piotrek
>>> On 8 Nov 2017, at 15:28, Javier Lopez <javier.lo...@zalando.de>
>>> wrote:
>>> Yes, I tested with just printing the stream. But it could take a
>>> lot of time to fail.
>>> On Wednesday, 8 November 2017, Piotr Nowojski
>>> <pi...@data-artisans.com> wrote:
>>> Thanks for quick answer.
>>> So it will also fail after some time with `fromElements` source
>>> instead of Kafka, right?
>>> Did you try it also without a Kafka producer?
>>> Piotrek
>>> On 8 Nov 2017, at 14:57, Javier Lopez <javier.lo...@zalando.de>
>>> wrote:
>>> Hi,
>>> You don't need data. With data it will die faster. I tested as
>>> well with a small data set, using the fromElements source, but it
>>> will take some time to die. It's better with some data.
>>> On 8 November 2017 at 14:54, Piotr Nowojski
>>> <pi...@data-artisans.com> wrote:
>>> Hi,
>>> Thanks for sharing this job.
>>> Do I need to feed some data to the Kafka to reproduce this
>> issue with your script?
>>>> Does this OOM issue also happen when you are not using the
>> Kafka source/sink?
>>>> Piotrek
>>>> On 8 Nov 2017, at 14:08, Javier Lopez <javier.lo...@zalando.de>
>> wrote:
>>>> Hi,
>>>> This is the test flink job we created to trigger this leak
>> https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6
>>>> And this is the python script we are using to execute the job
>> thousands of times to get the OOM problem
>> https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107
>>>> The cluster we used for this has this configuration:
>>>> Instance type: t2.large
>>>> Number of workers: 2
>>>> HeapMemory: 5500
>>>> Number of task slots per node: 4
>>>> TaskMangMemFraction: 0.5
>>>> NumberOfNetworkBuffers: 2000
>>>> We have tried several things, increasing the heap, reducing the
>> heap, more memory fraction, changes this value in the
>> taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to
>> work.
>>>> Thanks for your help.
>>>> On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>> On 2017-11-08 15:20, Piotr Nowojski wrote:
>>> Hi Ebru and Javier,
>>> Yes, if you could share this example job it would be helpful.
>>> Ebru: could you explain in a little more details how does
>> your Job(s)
>>> look like? Could you post some code? If you are just using
>> maps and
>>> filters there shouldn’t be any network transfers involved,
>> aside
>>> from Source and Sink functions.
>>> Piotrek
>>> On 8 Nov 2017, at 12:54, ebru
>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>> Hi Javier,
>>> It would be helpful if you share your test job with us.
>>> Which configurations did you try?
>>> -Ebru
>>> On 8 Nov 2017, at 14:43, Javier Lopez
>> <javier.lo...@zalando.de>
>>> wrote:
>>> Hi,
>>> We have been facing a similar problem. We have tried some
>> different
>>> configurations, as proposed in other email thread by Flavio
>> and
>>> Kien, but it didn't work. We have a workaround similar to
>> the one
>>> that Flavio has, we restart the taskmanagers once they reach
>> a
>>> memory threshold. We created a small test to remove all of
>> our
>>> dependencies and leave only flink native libraries. This
>> test reads
>>> data from a Kafka topic and writes it back to another topic
>> in
>>> Kafka. We cancel the job and start another every 5 seconds.
>> After
>>> ~30 minutes of doing this process, the cluster reaches the
>> OS memory
>>> limit and dies.
>>> Currently, we have a test cluster with 8 workers and 8 task
>> slots
>>> per node. We have one job that uses 56 slots, and we cannot
>> execute
>>> that job 5 times in a row because the whole cluster dies. If
>> you
>>> want, we can publish our test job.
>>> Regards,
>>> On 8 November 2017 at 11:20, Aljoscha Krettek
>> <aljos...@apache.org>
>>> wrote:
>>> @Nico & @Piotr Could you please have a look at this? You
>> both
>>> recently worked on the network stack and might be most
>> familiar with
>>> this.
>>> On 8. Nov 2017, at 10:25, Flavio Pompermaier
>> <pomperma...@okkam.it>
>>> wrote:
>>> We also have the same problem in production. At the moment
>> the
>>> solution is to restart the entire Flink cluster after every
>> job..
>>> We've tried to reproduce this problem with a test (see
>>> https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we
>> don't
>>> know whether the error produced by the test and the leak are
>>> correlated..
>>> Best,
>>> Flavio
>>> On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA
>> EBRU
>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>> On 2017-11-07 16:53, Ufuk Celebi wrote:
>>> Do you use any windowing? If yes, could you please share
>> that code?
>>> If
>>> there is no stateful operation at all, it's strange where
>> the list
>>> state instances are coming from.
>>> On Tue, Nov 7, 2017 at 2:35 PM, ebru
>> <b20926...@cs.hacettepe.edu.tr>
>>> wrote:
>>> Hi Ufuk,
>>> We don’t explicitly define any state descriptor. We only
>> use map
>>> and filters
>>> operator. We thought that gc handle clearing the flink’s
>> internal
>>> states.
>>> So how can we manage the memory if it is always increasing?
>>> - Ebru
>>> On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org> wrote:
>>> Hey Ebru, the memory usage might be increasing as long as a
>> job is
>>> running.
>>> This is expected (also in the case of multiple running
>> jobs). The
>>> screenshots are not helpful in that regard. :-(
>>> What kind of stateful operations are you using? Depending on
>> your
>>> use case,
>>> you have to manually call `clear()` on the state instance in
>> order
>>> to
>>> release the managed state.
>>> Best,
>>> Ufuk
>>> On Tue, Nov 7, 2017 at 12:43 PM, ebru
>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>> Begin forwarded message:
>>> From: ebru <b20926...@cs.hacettepe.edu.tr>
>>> Subject: Re: Flink memory leak
>>> Date: 7 November 2017 at 14:09:17 GMT+3
>>> To: Ufuk Celebi <u...@apache.org>
>>> Hi Ufuk,
>>> There are there snapshots of htop output.
>>> 1. snapshot is initial state.
>>> 2. snapshot is after submitted one job.
>>> 3. Snapshot is the output of the one job with 15000 EPS. And
>> the
>>> memory
>>> usage is always increasing over time.
>>> <1.png><2.png><3.png>
>>> On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org> wrote:
>>> Hey Ebru,
>>> let me pull in Aljoscha (CC'd) who might have an idea what's
>> causing
>>> this.
>>> Since multiple jobs are running, it will be hard to
>> understand to
>>> which job the state descriptors from the heap snapshot
>> belong to.
>>> - Is it possible to isolate the problem and reproduce the
>> behaviour
>>> with only a single job?
>>> – Ufuk
>>> On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU
>> ÇETİNKAYA EBRU
>>> <b20926...@cs.hacettepe.edu.tr> wrote:
>>> Hi,
>>> We are using Flink 1.3.1 in production, we have one job
>> manager and
>>> 3 task
>>> managers in standalone mode. Recently, we've noticed that we
>> have
>>> memory
>>> related problems. We use docker container to serve Flink
>> cluster. We
>>> have
>>> 300 slots and 20 jobs are running with parallelism of 10.
>> Also the
>>> job
>>> count
>>> may be change over time. Taskmanager memory usage always
>> increases.
>>> After
>>> job cancelation this memory usage doesn't decrease. We've
>> tried to
>>> investigate the problem and we've got the task manager jvm
>> heap
>>> snapshot.
>>> According to the jam heap analysis, possible memory leak was
>> Flink
>>> list
>>> state descriptor. But we are not sure that is the cause of
>> our
>>> memory
>>> problem. How can we solve the problem?
>>> We have two types of Flink job. One has no state full
>> operator
>>> contains only maps and filters and the other has time window
>> with
>>> count trigger.
>>> * We've analysed the jvm heaps again in different
>> conditions. First
>>> we analysed the snapshot when no flink jobs running on
>> cluster. (image
>>> 1)
>>> * Then, we analysed the jvm heap snapshot when the flink job
>> that has
>>> no state full operator is running. And according to the
>> results, leak
>>> suspect was NetworkBufferPool (image 2)
>>> *   Last analys, there were both two types of jobs running
>> and leak
>>> suspect was again NetworkBufferPool. (image 3)
>>> In our system jobs are regularly cancelled and resubmitted so
>> we
>>> noticed that when job is submitted some amount of memory
>> allocated and
>>> after cancelation this allocated memory never freed. So over
>> time
>>> memory usage is always increasing and exceeded the limits.
>> Links:
>> ------
>> [1] https://issues.apache.org/jira/browse/FLINK-7845
>> Hi Piotr,
>> There are two types of jobs.
>> In first, we use Kafka source and Kafka sink, there isn't any
>> window operator.
>>> In second job, we use Kafka source, filesystem sink and
>> elastic search sink and window operator for buffering.
>> Hi Piotrek,
>> Thanks for your reply.
>> We've tested our link cluster again. We have 360 slots, and our
>> cluster configuration is like this;
>> jobmanager.rpc.address: %JOBMANAGER%
>> jobmanager.rpc.port: 6123
>> jobmanager.heap.mb: 1536
>> taskmanager.heap.mb: 1536
>> taskmanager.numberOfTaskSlots: 120
>> taskmanager.memory.preallocate: false
>> parallelism.default: 1
>> jobmanager.web.port: 8081
>> state.backend: filesystem
>> state.backend.fs.checkpointdir: file:///storage/%CHECKPOINTDIR%
>> state.checkpoints.dir: file:///storage/%CHECKPOINTDIR%
>> taskmanager.network.numberOfBuffers: 5000
>> We are using docker based Flink cluster.
>> WE submitted 36 jobs with the parallelism of 10. After all slots
>> became full. Memory usage were increasing by the time and one by one
>> task managers start to die. And the exception was like this;
>> Taskmanager1 log:
>> Uncaught error from thread [flink-akka.actor.default-dispatcher-17]
>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for
>> ActorSystem[flink]
>> java.lang.NoClassDefFoundError:
>> org/apache/kafka/common/metrics/stats/Rate$1
>>   at
>> org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:93)
>>   at
>> org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:62)
>>   at
>> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
>>   at
>> org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
>>   at
>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35)
>>   at
>> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26)
>>   at
>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213)
>>   at
>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50)
>>   at
>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138)
>>   at
>> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
>>   at
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>   at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>   at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>   at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>   at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>   at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.kafka.common.metrics.stats.Rate$1
>>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>   ... 22 more
>> Taskmanager2 log:
>> Uncaught error from thread [flink-akka.actor.default-dispatcher-17]
>> shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for
>> ActorSystem[flink]
>> Java.lang.NoClassDefFoundError:
>> org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher$1
>>   at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:492)
>>   at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:480)
>>   at
>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213)
>>   at
>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50)
>>   at
>> org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138)
>>   at
>> org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
>>   at
>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>>   at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
>>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>   at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>>   at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>>   at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
>>   at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>   at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>   at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>   at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> Caused by: java.lang.ClassNotFoundException:
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1
>>   at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>   at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>   at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>   ... 18 more
>> -Ebru
> Hi Piotrek,
> 
> We attached the full log of the taskmanager1.
> This may not be a dependency issue because until all of the task slots is 
> full, we didn't get any No Class Def Found exception, when there is available 
> memory jobs can run without exception for days.
> Also there is Kafka Instance Already Exist exception in full log, but this 
> not relevant and doesn't effect jobs or task managers.
> 
> -Ebru<taskmanager1.log.zip>

Reply via email to