On 2017-11-08 15:20, Piotr Nowojski wrote:
Hi Ebru and Javier,

Yes, if you could share this example job it would be helpful.

Ebru: could you explain in a little more details how does your Job(s)
look like? Could you post some code? If you are just using maps and
filters there shouldn’t be any network transfers involved, aside
from Source and Sink functions.

Piotrek

On 8 Nov 2017, at 12:54, ebru <b20926...@cs.hacettepe.edu.tr> wrote:

Hi Javier,

It would be helpful if you share your test job with us.
Which configurations did you try?

-Ebru

On 8 Nov 2017, at 14:43, Javier Lopez <javier.lo...@zalando.de>
wrote:

Hi,

We have been facing a similar problem. We have tried some different
configurations, as proposed in other email thread by Flavio and
Kien, but it didn't work. We have a workaround similar to the one
that Flavio has, we restart the taskmanagers once they reach a
memory threshold. We created a small test to remove all of our
dependencies and leave only flink native libraries. This test reads
data from a Kafka topic and writes it back to another topic in
Kafka. We cancel the job and start another every 5 seconds. After
~30 minutes of doing this process, the cluster reaches the OS memory
limit and dies.

Currently, we have a test cluster with 8 workers and 8 task slots
per node. We have one job that uses 56 slots, and we cannot execute
that job 5 times in a row because the whole cluster dies. If you
want, we can publish our test job.

Regards,

On 8 November 2017 at 11:20, Aljoscha Krettek <aljos...@apache.org>
wrote:

@Nico & @Piotr Could you please have a look at this? You both
recently worked on the network stack and might be most familiar with
this.

On 8. Nov 2017, at 10:25, Flavio Pompermaier <pomperma...@okkam.it>
wrote:

We also have the same problem in production. At the moment the
solution is to restart the entire Flink cluster after every job..
We've tried to reproduce this problem with a test (see
https://issues.apache.org/jira/browse/FLINK-7845 [1]) but we don't
know whether the error produced by the test and the leak are
correlated..

Best,
Flavio

On Wed, Nov 8, 2017 at 9:51 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
<b20926...@cs.hacettepe.edu.tr> wrote:
On 2017-11-07 16:53, Ufuk Celebi wrote:
Do you use any windowing? If yes, could you please share that code?
If
there is no stateful operation at all, it's strange where the list
state instances are coming from.

On Tue, Nov 7, 2017 at 2:35 PM, ebru <b20926...@cs.hacettepe.edu.tr>
wrote:
Hi Ufuk,

We don’t explicitly define any state descriptor. We only use map
and filters
operator. We thought that gc handle clearing the flink’s internal
states.
So how can we manage the memory if it is always increasing?

- Ebru

On 7 Nov 2017, at 16:23, Ufuk Celebi <u...@apache.org> wrote:

Hey Ebru, the memory usage might be increasing as long as a job is
running.
This is expected (also in the case of multiple running jobs). The
screenshots are not helpful in that regard. :-(

What kind of stateful operations are you using? Depending on your
use case,
you have to manually call `clear()` on the state instance in order
to
release the managed state.

Best,

Ufuk

On Tue, Nov 7, 2017 at 12:43 PM, ebru
<b20926...@cs.hacettepe.edu.tr> wrote:

Begin forwarded message:

From: ebru <b20926...@cs.hacettepe.edu.tr>
Subject: Re: Flink memory leak
Date: 7 November 2017 at 14:09:17 GMT+3
To: Ufuk Celebi <u...@apache.org>

Hi Ufuk,

There are there snapshots of htop output.
1. snapshot is initial state.
2. snapshot is after submitted one job.
3. Snapshot is the output of the one job with 15000 EPS. And the
memory
usage is always increasing over time.

<1.png><2.png><3.png>

On 7 Nov 2017, at 13:34, Ufuk Celebi <u...@apache.org> wrote:

Hey Ebru,

let me pull in Aljoscha (CC'd) who might have an idea what's causing
this.

Since multiple jobs are running, it will be hard to understand to
which job the state descriptors from the heap snapshot belong to.
- Is it possible to isolate the problem and reproduce the behaviour
with only a single job?

– Ufuk

On Tue, Nov 7, 2017 at 10:27 AM, ÇETİNKAYA EBRU ÇETİNKAYA EBRU
<b20926...@cs.hacettepe.edu.tr> wrote:

Hi,

We are using Flink 1.3.1 in production, we have one job manager and
3 task
managers in standalone mode. Recently, we've noticed that we have
memory
related problems. We use docker container to serve Flink cluster. We
have
300 slots and 20 jobs are running with parallelism of 10. Also the
job
count
may be change over time. Taskmanager memory usage always increases.
After
job cancelation this memory usage doesn't decrease. We've tried to
investigate the problem and we've got the task manager jvm heap
snapshot.
According to the jam heap analysis, possible memory leak was Flink
list
state descriptor. But we are not sure that is the cause of our
memory
problem. How can we solve the problem?

We have two types of Flink job. One has no state full operator
contains only maps and filters and the other has time window with
count trigger.
 * We've analysed the jvm heaps again in different conditions. First
we analysed the snapshot when no flink jobs running on cluster. (image
1)
* Then, we analysed the jvm heap snapshot when the flink job that has
no state full operator is running. And according to the results, leak
suspect was NetworkBufferPool (image 2)
*   Last analys, there were both two types of jobs running and leak
suspect was again NetworkBufferPool. (image 3)
In our system jobs are regularly cancelled and resubmitted so we
noticed that when job is submitted some amount of memory allocated and
after cancelation this allocated memory never freed. So over time
memory usage is always increasing and exceeded the limits.





Links:
------
[1] https://issues.apache.org/jira/browse/FLINK-7845
Hi Piotr,

There are two types of jobs.
In first, we use Kafka source and Kafka sink, there isn't any window operator. In second job, we use Kafka source, filesystem sink and elastic search sink and window operator for buffering.

Reply via email to