Hello,

I have done some threads checking and dumps. And I have disabled the checkpointing.

Here are my findings.

I did a thread dump a few hours after I booted up the whole cluster. (@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit )

The dump shows that most threads are of 3 sources.
*
**OutputFlusher --- 634 -- Sleeping State*

"OutputFlusher" - Thread t@4758
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Thread.sleep(Native Method)
at org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164)

   Locked ownable synchronizers:
        - None
*
**Metrics --- 376 ( Flink Metrics Reporter it's the only metrics being used ) -- Parked State*

"metrics-meter-tick-thread-1" - Thread t@29024
   java.lang.Thread.State: TIMED_WAITING
        at sun.misc.Unsafe.park(Native Method)
- parking to wait for <bcfb9f9> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
        - None
*
*

*tend -- 220 ( Aerospike Client Thread ) -- Sleeping State
*

 "tend" - Thread t@29011
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Thread.sleep(Native Method)
        at com.aerospike.client.util.Util.sleep(Util.java:38)
        at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
        at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
        - None


I have 2 streaming jobs and a batch Job that runs once in a while.

Streaming job A runs with a parallel of 2 and runs Aerospike only in RichSink .

Streaming job B runs with a parallel of 24 and runs Aerospike in RichFilterFunction / RichMapFunction with open and close methods, in order to open and close the client.

Batch Job runs Aerospike Client in RichFilterFunction / RichMapFunction with open and close methods in order to open and close the client.

Next thing I cancelled all the streaming jobs @5/12/2016 and checked the threads and the JVM non-heap usage.

JVM non-heap usage reaches 3GB, threads go down, but some still linger around and they are the following.

*Metrics --- 790 ( Flink Metrics Reporter it's the only metrics being used ) *

"metrics-meter-tick-thread-1" - Thread t@29024
   java.lang.Thread.State: TIMED_WAITING
        at sun.misc.Unsafe.park(Native Method)
- parking to wait for <bcfb9f9> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
        - None

*
*

*tend -- 432**( Aerospike Client Thread )*


 "tend" - Thread t@29011
   java.lang.Thread.State: TIMED_WAITING
        at java.lang.Thread.sleep(Native Method)
        at com.aerospike.client.util.Util.sleep(Util.java:38)
        at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
        at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
        - None


Total number threads are 1289 ( total ) / 1220 ( tend + metrics ) . So I have 1220 threads that I believe that sould be dead and not running, since I have no jobs running at all.

And the JVM Non-HEAP usage doesn't decreases at all, after removing every job.


Why the hell metrics grow to no end ?

I am using the following libs for metrics :

- metrics-graphite-3.1.0.jar

- metrics-core-3.1.0.jar

- flink-metrics-dropwizard-1.1.3.jar

- flink-metrics-graphite-1.1.3.jar

And the config for reporter is :

metrics.reporters: graphite
metrics.reporter.graphite.class: org.apache.flink.metrics.graphite.GraphiteReporter
metrics.reporter.graphite.host: CARBONRELAYHOST
metrics.reporter.graphite.port: 2003


Shouldn't also the Aerospike Client be closed ? Or am I missing something, or doing something wrong ?


Sorry for the long post.

Best Regards,

Daniel Santos


On 11/29/2016 04:57 PM, Ufuk Celebi wrote:
Hey Daniel!

Thanks for reporting this. Unbounded growth of non-heap memory is not expected. 
 What kind of Threads are you seeing being spawned/lingering around?

As a first step, could you try to disable checkpointing and see how it behaves 
afterwards?

– Ufuk

On 29 November 2016 at 17:32:32, Daniel Santos (dsan...@cryptolab.net) wrote:
Hello,
Nope I am using Hadoop HDFS, as state backend, Kafka, as source, and a
HttpClient as a Sink, also Kafka as Sink.
So it's possible that the state backend is the culprit?
Curious thing is even when no jobs are running streaming or otherwise,
the JVM Non-HEAP stays the same.
Which I find it odd.
Another curious thing is that it's proportional to an increase of JVM
thread's number.
Whenever there are more JVM threads running there is also more JVM
Non-HEAP being used, which makes sense.
But threads stick around never decreasing, too, likewise JVM Non-HEAP
memory.
These observations described are based on what flink's metrics are being
sent and recorded to our graphite's system.
Best Regards, Daniel Santos On 11/29/2016 04:04 PM, Cliff Resnick wrote:
Are you using the RocksDB backend in native mode? If so then the
off-heap memory may be there.

On Tue, Nov 29, 2016 at 9:54 AM, > > > wrote:

i have the same problem,but i put the flink job into yarn.
but i put the job into yarn on the computer 22,and the job can
success run,and the jobmanager is 79 and taskmanager is 69,they
three different compu345ter,
however,on computer 22,the pid=3463,which is the job that put into
yarn,is have 2.3g memory,15% of total,
the commend is : ./flink run -m yarn-cluster -yn 1 -ys 1 -yjm 1024
-ytm 1024 ....
why in conputer 22,has occupy so much momory?the job is running
computer 79 and computer 69.
What would be the possible causes of such behavior ?
Best Regards,
----- 原始邮件 -----
发件人:Daniel Santos > > >
收件人:user@flink.apache.org
主题:JVM Non Heap Memory
日期:2016年11月29日 22点26分


Hello,
Is it common to have high usage of Non-Heap in JVM ?
I am running flink in stand-alone cluster and in docker, with each
docker bieng capped at 6G of memory.
I have been struggling to keep memory usage in check.
The non-heap increases to no end. It start with just 100MB of
usage and
after a day it reaches to 1,3GB.
Then evetually reaches to 2GB and then eventually the docker is
killed
because it has reached the memory limit.
My configuration for each flink task manager is the following :
----------- flink-conf.yaml --------------
taskmanager.heap.mb: 3072
taskmanager.numberOfTaskSlots: 8
taskmanager.memory.preallocate: false
taskmanager.network.numberOfBuffers: 12500
taskmanager.memory.off-heap: false
---------------------------------------------
What would be the possible causes of such behavior ?
Best Regards,
Daniel Santos



Reply via email to