Fwd: Flink memory usage monitoring

2020-10-27 Thread Matthias Pohl
I missed adding the mailing list in my previous email.

-- Forwarded message -
From: Matthias Pohl 
Date: Tue, Oct 27, 2020 at 12:39 PM
Subject: Re: Flink memory usage monitoring
To: Rajesh Payyappilly Jose 


Hi Rajesh,
thanks for reaching out to us. We worked on providing metrics for managed
memory and network memory as part of FLIP-102 [1]. It looks like these
features are going to be added to the upcoming release of Flink 1.12.

We decided to not include off-heap memory as it is not necessarily under
control of Flink (e.g. user code can allocate native memory and Flink
wouldn't be aware of it). Hence, providing numbers for off-heap memory
usage might be misleading. There will be a metric to monitor the Metaspace
usage, though.

Best,
Matthias

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-102%3A+Add+More+Metrics+to+TaskManager

On Tue, Oct 20, 2020 at 8:23 PM Rajesh Payyappilly Jose 
wrote:

> Classification: *Internal*
>
> Hi,
>
>
>
> Environment - Flink 1.11 on K8s
>
>
>
> Is there a way to monitor the usage of managed memory, off-heap memory and
> network memory?
>
>
>
> -Rajesh
>
>
> ::DISCLAIMER::
> --
> The contents of this e-mail and any attachment(s) are confidential and
> intended for the named recipient(s) only. E-mail transmission is not
> guaranteed to be secure or error-free as information could be intercepted,
> corrupted, lost, destroyed, arrive late or incomplete, or may contain
> viruses in transmission. The e mail and its contents (with or without
> referred errors) shall therefore not attach any liability on the originator
> or HCL or its affiliates. Views or opinions, if any, presented in this
> email are solely those of the author and may not necessarily reflect the
> views or opinions of HCL or its affiliates. Any form of reproduction,
> dissemination, copying, disclosure, modification, distribution and / or
> publication of this message without the prior written consent of authorized
> representative of HCL is strictly prohibited. If you have received this
> email in error please delete it and notify the sender immediately. Before
> opening any email and/or attachments, please check them for viruses and
> other defects.
>


Flink memory usage monitoring

2020-10-20 Thread Rajesh Payyappilly Jose
Classification: Internal
Hi,

Environment - Flink 1.11 on K8s

Is there a way to monitor the usage of managed memory, off-heap memory and 
network memory?

-Rajesh

::DISCLAIMER::

The contents of this e-mail and any attachment(s) are confidential and intended 
for the named recipient(s) only. E-mail transmission is not guaranteed to be 
secure or error-free as information could be intercepted, corrupted, lost, 
destroyed, arrive late or incomplete, or may contain viruses in transmission. 
The e mail and its contents (with or without referred errors) shall therefore 
not attach any liability on the originator or HCL or its affiliates. Views or 
opinions, if any, presented in this email are solely those of the author and 
may not necessarily reflect the views or opinions of HCL or its affiliates. Any 
form of reproduction, dissemination, copying, disclosure, modification, 
distribution and / or publication of this message without the prior written 
consent of authorized representative of HCL is strictly prohibited. If you have 
received this email in error please delete it and notify the sender 
immediately. Before opening any email and/or attachments, please check them for 
viruses and other defects.



Re: Flink memory usage

2017-11-07 Thread Greg Hogan
I’ve used the following simple script to capture Flink metrics by running:
python -u ./statsd_server.py 9020 > statsd_server.log


>>> flink-conf.yaml
metrics.reporters: statsd_reporter
metrics.reporter.statsd_reporter.class: 
org.apache.flink.metrics.statsd.StatsDReporter
metrics.reporter.statsd_reporter.host: 
metrics.reporter.statsd_reporter.port: 9020


>>> statsd_server.py
#!/usr/bin/env python

import socket
import sys
import time

if len(sys.argv) < 2:
  print('Usage {} '.format(sys.argv[0]))
  sys.exit(-1)

UDP_IP = ''
UDP_PORT = int(sys.argv[1])

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((UDP_IP, UDP_PORT))

while True:
data, addr = sock.recvfrom(4096)
print('{:.6f} {}'.format(time.time(), data))


> On Nov 5, 2017, at 4:40 AM, Jürgen Thomann  
> wrote:
> 
> Can you use wget (curl will work as well)? You can find the taskmanagers with 
> wget -O - http://localhost:8081/taskmanagers
> and wget -O - http://localhost:8081/taskmanagers/ 
> to see detailed jvm
> memory stats. localhost:8081 is in my example the jobmanager.
> 
> 
> On 04.11.2017 16:19, AndreaKinn wrote:
>> Anyway, If I understood how system metrics works (the results seems to be
>> showed in browser) I can't use it because my cluster is accessible only with
>> terminal via ssh
>> 
>> 
>> 
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



Re: Flink memory usage

2017-11-05 Thread Jürgen Thomann
Can you use wget (curl will work as well)? You can find the taskmanagers 
with wget -O - http://localhost:8081/taskmanagers
and wget -O - http://localhost:8081/taskmanagers/request> to see detailed jvm

memory stats. localhost:8081 is in my example the jobmanager.


On 04.11.2017 16:19, AndreaKinn wrote:

Anyway, If I understood how system metrics works (the results seems to be
showed in browser) I can't use it because my cluster is accessible only with
terminal via ssh



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink memory usage

2017-11-04 Thread AndreaKinn
Anyway, If I understood how system metrics works (the results seems to be
showed in browser) I can't use it because my cluster is accessible only with
terminal via ssh 



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink memory usage

2017-11-04 Thread AndreaKinn
I have used sysstat linux tool.

On the node the only one application running is Flink. The outcomes measured
with metric system could be different?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink memory usage

2017-11-04 Thread Kien Truong

Hi,

How did you measure the memory usage ?

JVM processes tend to occupy the maximum memory allocated to them, 
regardless of whether those memory are actively in used or not. To 
correctly measure the memory usage, you should use Flink's metric system[1]


Regards,

Kien

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html#system-metrics


On 11/4/2017 3:15 AM, AndreaKinn wrote:

Hi,
I would like to share some considerations about Flink memory consumption.
I have a cluster composed of three nodes: 1 used both as JM and TM and other
2 TM.

I ran two identical applications (in different moments) on it. The only
difference is that on the second one I doubled every operators, essentially
to check what changes in resource's usage.

Analysing the outcomes on cpu side effectively the efforts are doubled.
Doing the same with memory I had these results:



which to me seems completely counterintuitive since the results are
essentially equal.
I can imagine in the second case the memory was effectively almost full but
why Flink gets such a lot of memory even in the first case?
How it is explained this behaviour?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Flink memory usage

2017-11-03 Thread AndreaKinn
Hi,
I would like to share some considerations about Flink memory consumption.
I have a cluster composed of three nodes: 1 used both as JM and TM and other
2 TM.

I ran two identical applications (in different moments) on it. The only
difference is that on the second one I doubled every operators, essentially
to check what changes in resource's usage.

Analysing the outcomes on cpu side effectively the efforts are doubled.
Doing the same with memory I had these results:


 

which to me seems completely counterintuitive since the results are
essentially equal.
I can imagine in the second case the memory was effectively almost full but
why Flink gets such a lot of memory even in the first case?
How it is explained this behaviour?



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Flink memory usage

2017-04-21 Thread Till Rohrmann
Hi Billy,

if it's possible that you can share some parts of your code privately with
me, then I can try to figure out what's going wrong.

Cheers,
Till

On Thu, Apr 20, 2017 at 6:00 PM, Newport, Billy <billy.newp...@gs.com>
wrote:

> Ok
>
> The concensus seems to be that it’s us not Flink J So we’ll look harder
> at what we’re doing in case there is anything silly. We are using 16K
> network buffers BTW which is around 0.5GB with the defaults.
>
>
>
>
>
> *From:* Till Rohrmann [mailto:trohrm...@apache.org]
> *Sent:* Thursday, April 20, 2017 11:52 AM
> *To:* Stefano Bortoli
> *Cc:* Newport, Billy [Tech]; Fabian Hueske; user@flink.apache.org
>
> *Subject:* Re: Flink memory usage
>
>
>
> Hi Billy,
>
>
>
> if you didn't split the different data sets up into different slot sharing
> groups, then your maximum parallelism is 40. Thus, it should be enough to
> assign 40^2 * 20 * 4 = 128000 network buffers. If that is not enough
> because you have more than 4 shuffling steps in parallel running then you
> have to increase the last term.
>
>
>
> OOM exceptions should actually only occur due to user code objects. Given
> that you have reserved a massive amount of memory for the network buffers
> the remaining heap for the user code is probably very small. Try whether
> you can decrease the number of network buffers. Moreover, check whether
> your user code keeps somewhere references to objects which could cause the
> OOM.
>
>
>
> Cheers,
>
> Till
>
>
>
> On Thu, Apr 20, 2017 at 5:42 PM, Stefano Bortoli <
> stefano.bort...@huawei.com> wrote:
>
> I think that if you have a lot of memory available, the GC gets kind of
> lazy. In our case, the issue was just the latency caused by the GC, cause
> we were loading more data than it could fit in memory. Hence optimizing the
> code gave us a lot of improvements. FlatMaps are also dangerous as objects
> can multiply beyond expected, making co-group extremely costly. :-) A
> distinct() well placed saves a lot of time and memory.
>
>
>
> My point is that having worked with scarce resources I learned that almost
> all the time the issue was my code, not the framework.
>
>
>
> Good luck.
>
>
>
> Stefano
>
>
>
> *From:* Newport, Billy [mailto:billy.newp...@gs.com]
> *Sent:* Thursday, April 20, 2017 4:46 PM
> *To:* Stefano Bortoli <stefano.bort...@huawei.com>; 'Fabian Hueske' <
> fhue...@gmail.com>
>
>
> *Cc:* 'user@flink.apache.org' <user@flink.apache.org>
> *Subject:* RE: Flink memory usage
>
>
>
> Your reuse idea kind of implies that it’s a GC generation rate issue, i.e.
> it’s not collecting fast enough so it’s running out of memory versus heap
> that’s actually anchored, right?
>
>
>
>
>
> *From:* Stefano Bortoli [mailto:stefano.bort...@huawei.com
> <stefano.bort...@huawei.com>]
> *Sent:* Thursday, April 20, 2017 10:33 AM
> *To:* Newport, Billy [Tech]; 'Fabian Hueske'
> *Cc:* 'user@flink.apache.org'
> *Subject:* RE: Flink memory usage
>
>
>
> Hi Billy,
>
>
>
> The only suggestion I can give is to check very well in your code for
> useless variable allocations, and foster reuse as much as possible. Don’t
> create a new collection at any map execution, but rather clear, reuse the
> collected output of the flatMap, and so on.  In the past we run long
> process of lot of data and small memory without problems. Many more complex
> co-group, joins and so on without any issue.
>
>
>
> My2c. Hope it helps.
>
>
>
> Stefano
>
>
>
> *From:* Newport, Billy [mailto:billy.newp...@gs.com <billy.newp...@gs.com>]
>
> *Sent:* Thursday, April 20, 2017 1:31 PM
> *To:* 'Fabian Hueske' <fhue...@gmail.com>
> *Cc:* 'user@flink.apache.org' <user@flink.apache.org>
> *Subject:* RE: Flink memory usage
>
>
>
> I don’t think our function are memory heavy they typically are cogroups
> and merge the records on the left with the records on the right.
>
>
>
> We’re currently requiring 720GB of heap to do our processing which frankly
> appears ridiculous to us. Could too much parallelism be causing the
> problem? Looking at:
>
>
>
> http://apache-flink-user-mailing-list-archive.2336050.
> n4.nabble.com/Optimal-Configuration-for-Cluster-td5024.html
> <https://urldefense.proofpoint.com/v2/url?u=http-3A__apache-2Dflink-2Duser-2Dmailing-2Dlist-2Darchive.2336050.n4.nabble.com_Optimal-2DConfiguration-2Dfor-2DCluster-2Dtd5024.html=DgMGaQ=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q=ORtElREFIpUSPJ6hOwnIj181geKdc8QSLx-WpIoc7FE=9H61hE0TVJvlOBRwSHrvnOhxKioqoepvCgjG0ZJefIY=>
>
>
>
> If 

RE: Flink memory usage

2017-04-20 Thread Newport, Billy
I don’t think our function are memory heavy they typically are cogroups and 
merge the records on the left with the records on the right.

We’re currently requiring 720GB of heap to do our processing which frankly 
appears ridiculous to us. Could too much parallelism be causing the problem? 
Looking at:

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Optimal-Configuration-for-Cluster-td5024.html

If we are processing 17 “datasets” in a single job and each has an individual 
parallelism of 40 is that a total parallelism (potential) of 17*40 and given 
your network buffers calculation of parallelism squared, would that do it or 
only if we explicitly configure it that way:

taskmanager.network.numberOfBuffers: p ^ 2 * t * 4

where p is the maximum parallelism of the job and t is the number of task 
manager.
You can process more than one parallel task per TM if you configure more than 
one processing slot per machine ( taskmanager.numberOfTaskSlots). The TM will 
divide its memory among all its slots. So it would be possible to start one TM 
for each machine with 100GB+ memory and 48 slots each.

Our pipeline for each dataset looks like this:

Read avro file -> FlatMap -> Validate each record with a flatmap ->
Read Parquet -> FlatMap -> Filter Live Rows -> CoGroup with the validated avro 
file above -> }
Read Parquet -> FlatMap -> Filter Dead Rows  
>  } Union cogroup with dead rows and write 
result to parquet file.

I don’t understand why this logic couldn’t run with a single task manager and 
just take longer. We’re having a lot of trouble trying to change the tuning to 
reduce the memory burn. We run the above pipeline with parallelism 40 for all 
17 datasets in a single job.

We’re running this config now which is not really justifiable for what we’re 
doing.

20 nodes 2 slots, 40 parallelism 36GB mem = 720GB of heap…

Thanks

From: Fabian Hueske [mailto:fhue...@gmail.com]
Sent: Wednesday, April 19, 2017 10:52 AM
To: Newport, Billy [Tech]
Cc: user@flink.apache.org
Subject: Re: Flink memory usage

Hi Billy,
Flink's internal operators are implemented to not allocate heap space 
proportional to the size of the input data.
Whenever Flink needs to hold data in memory (e.g., for sorting or building a 
hash table) the data is serialized into managed memory. If all memory is in 
use, Flink starts spilling to disk. This blog post discusses how Flink uses its 
managed memory [1] (still up to date, even though it's almost 2 years old).
The runtime code should actually quite stable. Most of the code has been there 
for several years (even before Flink was donated to the ASF) and we haven't 
seen many bugs reported for the DataSet runtime. Of course this does not mean 
that the code doesn't contain bugs.

However, Flink does not take care of the user code. For example a 
GroupReduceFunction that collects a lot of data, e.g., in a List on the heap, 
can still kill a program.
I would check if you have user functions that require lots of heap memory.
Also reducing the size of the managed memory to have more heap space available 
might help.
If that doesn't solve the problem, it would be good if you could share some 
details about your job (which operators, which local strategies, how many 
operators) that might help to identify the misbehaving operator.

Thanks, Fabian

[1] 
https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html<https://urldefense.proofpoint.com/v2/url?u=https-3A__flink.apache.org_news_2015_05_11_Juggling-2Dwith-2DBits-2Dand-2DBytes.html=DgMFaQ=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q=gXNF2FIfEb4pGn-GTNBdJ8q7RfWbahzA3eykq55STe0=PSRQ06vPRHlB-80MrNWAIGluVV4I-c7kZ35Dm-OIRzs=>


2017-04-19 16:09 GMT+02:00 Newport, Billy 
<billy.newp...@gs.com<mailto:billy.newp...@gs.com>>:
How does Flink use memory? We’re seeing cases when running a job on larger 
datasets where it throws OOM exceptions during the job. We’re using the Dataset 
API. Shouldn’t flink be streaming from disk to disk? We workaround by using 
fewer slots but it seems unintuitive that I need to change these settings given 
Flink != Spark. Why isn’t Flinks memory usage constant? Why couldn’t I run a 
job with a single task and a single slot for any size job successfully other 
than it takes much longer to run.

Thanks
Billy





Re: Flink memory usage

2017-04-19 Thread Fabian Hueske
Hi Billy,

Flink's internal operators are implemented to not allocate heap space
proportional to the size of the input data.
Whenever Flink needs to hold data in memory (e.g., for sorting or building
a hash table) the data is serialized into managed memory. If all memory is
in use, Flink starts spilling to disk. This blog post discusses how Flink
uses its managed memory [1] (still up to date, even though it's almost 2
years old).
The runtime code should actually quite stable. Most of the code has been
there for several years (even before Flink was donated to the ASF) and we
haven't seen many bugs reported for the DataSet runtime. Of course this
does not mean that the code doesn't contain bugs.

However, Flink does not take care of the user code. For example a
GroupReduceFunction that collects a lot of data, e.g., in a List on the
heap, can still kill a program.

I would check if you have user functions that require lots of heap memory.
Also reducing the size of the managed memory to have more heap space
available might help.
If that doesn't solve the problem, it would be good if you could share some
details about your job (which operators, which local strategies, how many
operators) that might help to identify the misbehaving operator.

Thanks, Fabian

[1]
https://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html



2017-04-19 16:09 GMT+02:00 Newport, Billy :

> How does Flink use memory? We’re seeing cases when running a job on larger
> datasets where it throws OOM exceptions during the job. We’re using the
> Dataset API. Shouldn’t flink be streaming from disk to disk? We workaround
> by using fewer slots but it seems unintuitive that I need to change these
> settings given Flink != Spark. Why isn’t Flinks memory usage constant? Why
> couldn’t I run a job with a single task and a single slot for any size job
> successfully other than it takes much longer to run.
>
>
>
> Thanks
>
> Billy
>
>
>
>
>