On 2017-11-10 17:50, Piotr Nowojski wrote:
I do not see anything abnormal in the logs before this error :(

What are your JVM settings and which java version are you running?
What happens if you limit the heap size so that the swap is never
used?

Piotrek

On 10 Nov 2017, at 14:57, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <b20926...@cs.hacettepe.edu.tr> wrote:

On 2017-11-10 13:14, Piotr Nowojski wrote:
jobmanager1.log and taskmanager2.log are the same. Can you also submit
files containing std output?
Piotrek
On 10 Nov 2017, at 09:35, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <b20926...@cs.hacettepe.edu.tr> wrote:
On 2017-11-10 11:04, Piotr Nowojski wrote:
Hi,
Thanks for the logs, however I do not see before mentioned exceptions
in it. It ends with java.lang.InterruptedException
Is it the correct log file? Also, could you attach the std output file
of the failing TaskManager?
Piotrek
On 10 Nov 2017, at 08:42, ÇETİNKAYA EBRU ÇETİNKAYA EBRU <b20926...@cs.hacettepe.edu.tr> wrote:
On 2017-11-09 20:08, Piotr Nowojski wrote:
Hi,
Could you attach full logs from those task managers? At first glance I don’t see a connection between those exceptions and any memory issue that you might had. It looks like a dependency issue in one (some?
All?) of your jobs.
Did you build your jars with -Pbuild-jar profile as described here:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/java_api_quickstart.html#build-project
?
If that doesn’t help. Can you binary search which job is causing the problem? There might be some Flink incompatibility between different versions and rebuilding a job’s jar with a version matching to the
cluster version might help.
Piotrek
On 9 Nov 2017, at 17:36, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
<b20926...@cs.hacettepe.edu.tr> wrote:
On 2017-11-08 18:30, Piotr Nowojski wrote:
Btw, Ebru:
I don’t agree that the main suspect is NetworkBufferPool. On your
screenshots it’s memory consumption was reasonable and stable:
596MB
-> 602MB -> 597MB.
PoolThreadCache memory usage ~120MB is also reasonable.
Do you experience any problems, like Out Of Memory
errors/crashes/long
GC pauses? Or just JVM process is using more memory over time? You
are
aware that JVM doesn’t like to release memory back to OS once it
was
used? So increasing memory usage until hitting some limit (for
example
JVM max heap size) is expected behaviour.
Piotrek
On 8 Nov 2017, at 15:48, Piotr Nowojski <pi...@data-artisans.com>
wrote:
I don’t know if this is relevant to this issue, but I was
constantly getting failures trying to reproduce this leak using your
Job, because you were using non deterministic getKey function:
@Override
public Integer getKey(Integer event) {
Random randomGen = new Random((new Date()).getTime());
return randomGen.nextInt() % 8;
}
And quoting Java doc of KeySelector:
"If invoked multiple times on the same object, the returned key must
be the same.”
I’m trying to reproduce this issue with following job:
https://gist.github.com/pnowojski/b80f725c1af7668051c773438637e0d3
Where IntegerSource is just an infinite source, DisardingSink is
well just discarding incoming data. I’m cancelling the job every 5
seconds and so far (after ~15 minutes) my memory consumption is
stable, well below maximum java heap size.
Piotrek
On 8 Nov 2017, at 15:28, Javier Lopez <javier.lo...@zalando.de>
wrote:
Yes, I tested with just printing the stream. But it could take a
lot of time to fail.
On Wednesday, 8 November 2017, Piotr Nowojski
<pi...@data-artisans.com> wrote:
Thanks for quick answer.
So it will also fail after some time with `fromElements` source
instead of Kafka, right?
Did you try it also without a Kafka producer?
Piotrek
On 8 Nov 2017, at 14:57, Javier Lopez <javier.lo...@zalando.de>
wrote:
Hi,
You don't need data. With data it will die faster. I tested as
well with a small data set, using the fromElements source, but it
will take some time to die. It's better with some data.
On 8 November 2017 at 14:54, Piotr Nowojski
<pi...@data-artisans.com> wrote:
Hi,
Thanks for sharing this job.
Do I need to feed some data to the Kafka to reproduce this
issue with your script?
Does this OOM issue also happen when you are not using the
Kafka source/sink?
Piotrek
On 8 Nov 2017, at 14:08, Javier Lopez <javier.lo...@zalando.de>
wrote:
Hi,
This is the test flink job we created to trigger this leak
https://gist.github.com/javieredo/c6052404dbe6cc602e99f4669a09f7d6
And this is the python script we are using to execute the job
thousands of times to get the OOM problem
https://gist.github.com/javieredo/4825324d5d5f504e27ca6c004396a107
The cluster we used for this has this configuration:
Instance type: t2.large
Number of workers: 2
HeapMemory: 5500
Number of task slots per node: 4
TaskMangMemFraction: 0.5
NumberOfNetworkBuffers: 2000
We have tried several things, increasing the heap, reducing the
heap, more memory fraction, changes this value in the
taskmanager.sh "TM_MAX_OFFHEAP_SIZE="2G"; and nothing seems to
work.
Thanks for your help.
On 8 November 2017 at 13:26, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
<b20926...@cs.hacettepe.edu.tr> wrote:
On 2017-11-08 15:20, Piotr Nowojski wrote:
Hi Ebru and Javier,
Yes, if you could share this example job it would be helpful.
Ebru: could you explain in a little more details how does
your Job(s)
look like? Could you post some code? If you are just using
maps and
filters there shouldn’t be any network transfers involved,
aside
from Source and Sink functions.
Piotrek
On 8 Nov 2017, at 12:54, ebru
<b20926...@cs.hacettepe.edu.tr> wrote:
Hi Javier,
It would be helpful if you share your test job with us.
Which configurations did you try?
-Ebru
On 8 Nov 2017, at 14:43, Javier Lopez
<javier.lo...@zalando.de>
wrote:
Hi,
We have been facing a similar problem. We have tried some
different
configurations, as proposed in other email thread by Flavio
and
Kien, but it didn't work. We have a workaround similar to
the one
that Flavio has, we restart the taskmanagers once they reach
a
memory threshold. We created a small test to remove all of
our
dependencies and leave only flink native libraries. This
test reads
data from a Kafka topic and writes it back to another topic
in
Kafka. We cancel the job and start another every 5 seconds.
After
~30 minutes of doing this process, the cluster reaches the
OS memory
limit and dies.
Currently, we have a test cluster with 8 workers and 8 task
slots
per node. We have one job that uses 56 slots, and we cannot
execute
that job 5 times in a row because the whole cluster dies. If
you
want, we can publish our test job.
Regards,
On 8 November 2017 at 11:20, Aljoscha Krettek
<aljos...@apache.org>
wrote:
@Nico & @Piotr Could you please have a look at this? You
both
recently worked on the network stack and might be most
familiar with
this.
On 8. Nov 2017, at 10:25, Flavio Pompermaier
<pomperma...@okkam.it>
wrote:
We also have the same problem in production. At the moment
the
solution is to restart the entire Flink cluster after every
job..
We've tried to reproduce this problem with a test (see
https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we
don't
know whether the error produced by the test and the leak are
correlated..
Best,
Flavio
On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA
EBRU
<b20926...@cs.hacettepe.edu.tr> wrote:
On 2017-11-07 16:53, Ufuk Celebi wrote:
Do you use any windowing? If yes, could you please share
that code?
If
there is no stateful operation at all, it's strange where
the list
state instances are coming from.
On Tue, Nov 7, 2017 at 2:35 PM, ebru
<b20926...@cs.hacettepe.edu.tr>
wrote:
Hi Ufuk,
We don’t explicitly define any state descriptor. We only
use map
and filters
operator. We thought that gc handle clearing the flink’s
internal
states.
So how can we manage the memory if it is always increasing?
- Ebru
On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org> wrote:
Hey Ebru, the memory usage might be increasing as long as a
job is
running.
This is expected (also in the case of multiple running
jobs). The
screenshots are not helpful in that regard. :-(
What kind of stateful operations are you using? Depending on
your
use case,
you have to manually call `clear()` on the state instance in
order
to
release the managed state.
Best,
Ufuk
On Tue, Nov 7, 2017 at 12:43 PM, ebru
<b20926...@cs.hacettepe.edu.tr> wrote:
Begin forwarded message:
From: ebru <b20926...@cs.hacettepe.edu.tr>
Subject: Re: Flink memory leak
Date: 7 November 2017 at 14:09:17 GMT+3
To: Ufuk Celebi <u...@apache.org>
Hi Ufuk,
There are there snapshots of htop output.
1. snapshot is initial state.
2. snapshot is after submitted one job.
3. Snapshot is the output of the one job with 15000 EPS. And
the
memory
usage is always increasing over time.
<1.png><2.png><3.png>
On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org> wrote:
Hey Ebru,
let me pull in Aljoscha (CC'd) who might have an idea what's
causing
this.
Since multiple jobs are running, it will be hard to
understand to
which job the state descriptors from the heap snapshot
belong to.
- Is it possible to isolate the problem and reproduce the
behaviour
with only a single job?
– Ufuk
On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU
ÇETİNKAYA EBRU
<b20926...@cs.hacettepe.edu.tr> wrote:
Hi,
We are using Flink 1.3.1 in production, we have one job
manager and
3 task
managers in standalone mode. Recently, we've noticed that we
have
memory
related problems. We use docker container to serve Flink
cluster. We
have
300 slots and 20 jobs are running with parallelism of 10.
Also the
job
count
may be change over time. Taskmanager memory usage always
increases.
After
job cancelation this memory usage doesn't decrease. We've
tried to
investigate the problem and we've got the task manager jvm
heap
snapshot.
According to the jam heap analysis, possible memory leak was
Flink
list
state descriptor. But we are not sure that is the cause of
our
memory
problem. How can we solve the problem?
We have two types of Flink job. One has no state full
operator
contains only maps and filters and the other has time window
with
count trigger.
* We've analysed the jvm heaps again in different
conditions. First
we analysed the snapshot when no flink jobs running on
cluster. (image
1)
* Then, we analysed the jvm heap snapshot when the flink job
that has
no state full operator is running. And according to the
results, leak
suspect was NetworkBufferPool (image 2)
*   Last analys, there were both two types of jobs running
and leak
suspect was again NetworkBufferPool. (image 3)
In our system jobs are regularly cancelled and resubmitted so
we
noticed that when job is submitted some amount of memory
allocated and
after cancelation this allocated memory never freed. So over
time
memory usage is always increasing and exceeded the limits.
Links:
------
[1] https://issues.apache.org/jira/browse/FLINK-7845
Hi Piotr,
There are two types of jobs.
In first, we use Kafka source and Kafka sink, there isn't any
window operator.
In second job, we use Kafka source, filesystem sink and
elastic search sink and window operator for buffering.
Hi Piotrek,
Thanks for your reply.
We've tested our link cluster again. We have 360 slots, and our
cluster configuration is like this;
jobmanager.rpc.address: %JOBMANAGER%
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 1536
taskmanager.heap.mb: 1536
taskmanager.numberOfTaskSlots: 120
taskmanager.memory.preallocate: false
parallelism.default: 1
jobmanager.web.port: 8081
state.backend: filesystem
state.backend.fs.checkpointdir: file:///storage/%CHECKPOINTDIR%
state.checkpoints.dir: file:///storage/%CHECKPOINTDIR%
taskmanager.network.numberOfBuffers: 5000
We are using docker based Flink cluster.
WE submitted 36 jobs with the parallelism of 10. After all slots
became full. Memory usage were increasing by the time and one by one
task managers start to die. And the exception was like this;
Taskmanager1 log:
Uncaught error from thread [flink-akka.actor.default-dispatcher-17] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for
ActorSystem[flink]
java.lang.NoClassDefFoundError:
org/apache/kafka/common/metrics/stats/Rate$1
  at
org.apache.kafka.common.metrics.stats.Rate.convert(Rate.java:93)
  at
org.apache.kafka.common.metrics.stats.Rate.measure(Rate.java:62)
  at
org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
  at
org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
  at
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35)
  at
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26)
  at
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213)
  at
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50)
  at
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138)
  at
org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
  at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
  at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
  at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
  at akka.actor.ActorCell.invoke(ActorCell.scala:487)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
  at akka.dispatch.Mailbox.run(Mailbox.scala:220)
  at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
  at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ClassNotFoundException:
org.apache.kafka.common.metrics.stats.Rate$1
  at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  ... 22 more
Taskmanager2 log:
Uncaught error from thread [flink-akka.actor.default-dispatcher-17] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for
ActorSystem[flink]
Java.lang.NoClassDefFoundError:
org/apache/flink/streaming/connectors/kafka/internals/AbstractFetcher$1
  at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:492)
  at
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$OffsetGauge.getValue(AbstractFetcher.java:480)
  at
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.serializeGauge(MetricDumpSerialization.java:213)
  at
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.access$200(MetricDumpSerialization.java:50)
  at
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization$MetricDumpSerializer.serialize(MetricDumpSerialization.java:138)
  at
org.apache.flink.runtime.metrics.dump.MetricQueryService.onReceive(MetricQueryService.java:109)
  at
akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
  at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
  at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:97)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
  at akka.actor.ActorCell.invoke(ActorCell.scala:487)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
  at akka.dispatch.Mailbox.run(Mailbox.scala:220)
  at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
  at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ClassNotFoundException:
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher$1
  at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  ... 18 more
-Ebru
Hi Piotrek,
We attached the full log of the taskmanager1.
This may not be a dependency issue because until all of the task slots is full, we didn't get any No Class Def Found exception, when there is available memory jobs can run without exception for days. Also there is Kafka Instance Already Exist exception in full log, but this not relevant and doesn't effect jobs or task managers.
-Ebru<taskmanager1.log.zip>
Hi,
Sorry we attached wrong log file. I've attached all task managers and job manager's log. All task managers and job manager was killed.<logs.zip>

We were lost the std output files so we've reproduced the problem. I attached task managers and job manager log and also std output files. And after some time, it start using swap, the screenshot of http output is also attached.<logs2-1.zip><logs2-2.zip><logs2-3.zip><error2.png>
We use this jvm options,
-XX:+UseG1GC \
-XX:+UseStringDeduplication \
java version "1.8.0_131
How can we limit the heap size, we've already set job manager.heap.mb and task manager.heap.mb configs as 1536.
Did you mean the limit the docker containers heap size?

Reply via email to