Just to note that the bug mentioned by Chesnay does not invalidate Stefan's 
comments. ;-)

Chesnay's issue is here: https://issues.apache.org/jira/browse/FLINK-5261

I added an issue to improve the documentation about cancellation 
(https://issues.apache.org/jira/browse/FLINK-5260).

Which version of Flink are you using? Chesnay's fix will make it into the 
upcoming 1.1.4 release.


On 5 December 2016 at 14:04:49, Chesnay Schepler (ches...@apache.org) wrote:
> Hello Daniel,
>  
> I'm afraid you stumbled upon a bug in Flink. Meters were not properly
> cleaned up, causing the underlying dropwizard meter update threads to
> not be shutdown either.
>  
> I've opened a JIRA  
> and will open a PR soon.
>  
> Thank your for reporting this issue.
>  
> Regards,
> Chesnay
>  
> On 05.12.2016 12:05, Stefan Richter wrote:
> > Hi Daniel,
> >
> > the behaviour you observe looks like some threads are not canceled.
> > Thread cancelation in Flink (and Java in general) is always
> > cooperative, where cooperative means that the thread you want to
> > cancel should somehow check cancelation and react to it. Sometimes
> > this also requires some effort from the client that wants to cancel a
> > thread. So if you implement e.g. custom operators or functions with
> > aerospike, you must ensure that they a) react on cancelation and b)
> > cleanup their resources. If you do not consider this, your aerospike
> > client might stay in a blocking call forever, in particular blocking
> > IO calls are prone to this. What you need to ensure is that
> > cancelation from the clients includes closing IO resources such as
> > streams to unblock the thread and allow for termination. This means
> > that you need your code must (to a certain degree) actively
> > participate in Flink's task lifecycle. In Flink 1.2 we introduce a
> > feature called CloseableRegistry, which makes participating in this
> > lifecycle easier w.r.t. closing resources. For the time being, you
> > should check that Flink’s task cancelation also causes your code to
> > close the aerospike client and check cancelation flags.
> >
> > Best,
> > Stefan
> >
> >> Am 05.12.2016 um 11:42 schrieb Daniel Santos > >> >:
> >>
> >> Hello,
> >>
> >> I have done some threads checking and dumps. And I have disabled the
> >> checkpointing.
> >>
> >> Here are my findings.
> >>
> >> I did a thread dump a few hours after I booted up the whole cluster.
> >> (@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit )
> >>
> >> The dump shows that most threads are of 3 sources.
> >> *
> >> **OutputFlusher --- 634 -- Sleeping State*
> >>
> >> "OutputFlusher" - Thread t@4758
> >> java.lang.Thread.State: TIMED_WAITING
> >> at java.lang.Thread.sleep(Native Method)
> >> at
> >> org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164)
> >>   
> >>
> >> Locked ownable synchronizers:
> >> - None
> >> *
> >> **Metrics --- 376 ( Flink Metrics Reporter it's the only metrics
> >> being used ) -- Parked State*
> >>
> >> "metrics-meter-tick-thread-1" - Thread t@29024
> >> java.lang.Thread.State: TIMED_WAITING
> >> at sun.misc.Unsafe.park(Native Method)
> >> - parking to wait for (a
> >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)  
> >> at
> >> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)  
> >> at
> >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> >>   
> >> at
> >> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
> >> at
> >> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
> >>   
> >> at
> >> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
> >>   
> >> at
> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
> >>   
> >> at
> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> >>   
> >> at java.lang.Thread.run(Thread.java:745)
> >>
> >> Locked ownable synchronizers:
> >> - None
> >> *
> >> *
> >>
> >> *tend -- 220 ( Aerospike Client Thread ) -- Sleeping State
> >> *
> >>
> >> "tend" - Thread t@29011
> >> java.lang.Thread.State: TIMED_WAITING
> >> at java.lang.Thread.sleep(Native Method)
> >> at com.aerospike.client.util.Util.sleep(Util.java:38)
> >> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
> >> at java.lang.Thread.run(Thread.java:745)
> >>
> >> Locked ownable synchronizers:
> >> - None
> >>
> >>
> >> I have 2 streaming jobs and a batch Job that runs once in a while.
> >>
> >> Streaming job A runs with a parallel of 2 and runs Aerospike only in
> >> RichSink .
> >>
> >> Streaming job B runs with a parallel of 24 and runs Aerospike in
> >> RichFilterFunction / RichMapFunction with open and close methods, in
> >> order to open and close the client.
> >>
> >> Batch Job runs Aerospike Client in RichFilterFunction /
> >> RichMapFunction with open and close methods in order to open and
> >> close the client.
> >>
> >> Next thing I cancelled all the streaming jobs @5/12/2016 and checked
> >> the threads and the JVM non-heap usage.
> >>
> >> JVM non-heap usage reaches 3GB, threads go down, but some still
> >> linger around and they are the following.
> >>
> >> *Metrics --- 790 ( Flink Metrics Reporter it's the only metrics being
> >> used ) *
> >>
> >> "metrics-meter-tick-thread-1" - Thread t@29024
> >> java.lang.Thread.State: TIMED_WAITING
> >> at sun.misc.Unsafe.park(Native Method)
> >> - parking to wait for (a
> >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)  
> >> at
> >> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)  
> >> at
> >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> >>   
> >> at
> >> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
> >> at
> >> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
> >>   
> >> at
> >> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
> >>   
> >> at
> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
> >>   
> >> at
> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> >>   
> >> at java.lang.Thread.run(Thread.java:745)
> >>
> >> Locked ownable synchronizers:
> >> - None
> >>
> >> *
> >> *
> >>
> >> *tend -- 432**( Aerospike Client Thread )*
> >>
> >>
> >> "tend" - Thread t@29011
> >> java.lang.Thread.State: TIMED_WAITING
> >> at java.lang.Thread.sleep(Native Method)
> >> at com.aerospike.client.util.Util.sleep(Util.java:38)
> >> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
> >> at java.lang.Thread.run(Thread.java:745)
> >>
> >> Locked ownable synchronizers:
> >> - None
> >>
> >>
> >> Total number threads are 1289 ( total ) / 1220 ( tend + metrics ) .
> >> So I have 1220 threads that I believe that sould be dead and not
> >> running, since I have no jobs running at all.
> >>
> >> And the JVM Non-HEAP usage doesn't decreases at all, after removing
> >> every job.
> >>
> >>
> >> Why the hell metrics grow to no end ?
> >>
> >> I am using the following libs for metrics :
> >>
> >> - metrics-graphite-3.1.0.jar
> >>
> >> - metrics-core-3.1.0.jar
> >>
> >> - flink-metrics-dropwizard-1.1.3.jar
> >>
> >> - flink-metrics-graphite-1.1.3.jar
> >>
> >> And the config for reporter is :
> >>
> >> metrics.reporters: graphite
> >> metrics.reporter.graphite.class:
> >> org.apache.flink.metrics.graphite.GraphiteReporter
> >> metrics.reporter.graphite.host: CARBONRELAYHOST
> >> metrics.reporter.graphite.port: 2003
> >>
> >>
> >> Shouldn't also the Aerospike Client be closed ? Or am I missing
> >> something, or doing something wrong ?
> >>
> >>
> >> Sorry for the long post.
> >>
> >> Best Regards,
> >>
> >> Daniel Santos
> >>
> >>
> >> On 11/29/2016 04:57 PM, Ufuk Celebi wrote:
> >>> Hey Daniel!
> >>>
> >>> Thanks for reporting this. Unbounded growth of non-heap memory is not 
> >>> expected.  
> What kind of Threads are you seeing being spawned/lingering around?
> >>>
> >>> As a first step, could you try to disable checkpointing and see how it 
> >>> behaves afterwards?  
> >>>
> >>> – Ufuk
> >>>
> >>> On 29 November 2016 at 17:32:32, Daniel Santos (dsan...@cryptolab.net) 
> >>> wrote:  
> >>>> Hello,
> >>>>
> >>>> Nope I am using Hadoop HDFS, as state backend, Kafka, as source, and a
> >>>> HttpClient as a Sink, also Kafka as Sink.
> >>>> So it's possible that the state backend is the culprit?
> >>>>
> >>>> Curious thing is even when no jobs are running streaming or otherwise,
> >>>> the JVM Non-HEAP stays the same.
> >>>> Which I find it odd.
> >>>>
> >>>> Another curious thing is that it's proportional to an increase of JVM
> >>>> thread's number.
> >>>> Whenever there are more JVM threads running there is also more JVM
> >>>> Non-HEAP being used, which makes sense.
> >>>> But threads stick around never decreasing, too, likewise JVM Non-HEAP
> >>>> memory.
> >>>>
> >>>> These observations described are based on what flink's metrics are being
> >>>> sent and recorded to our graphite's system.
> >>>>
> >>>> Best Regards,
> >>>>
> >>>> Daniel Santos
> >>>>
> >>>> On 11/29/2016 04:04 PM, Cliff Resnick wrote:
> >>>>> Are you using the RocksDB backend in native mode? If so then the
> >>>>> off-heap memory may be there.
> >>>>>
> >>>>> On Tue, Nov 29, 2016 at 9:54 AM, > > > wrote:
> >>>>>
> >>>>> i have the same problem,but i put the flink job into yarn.
> >>>>> but i put the job into yarn on the computer 22,and the job can
> >>>>> success run,and the jobmanager is 79 and taskmanager is 69,they
> >>>>> three different compu345ter,
> >>>>> however,on computer 22,the pid=3463,which is the job that put into
> >>>>> yarn,is have 2.3g memory,15% of total,
> >>>>> the commend is : ./flink run -m yarn-cluster -yn 1 -ys 1 -yjm 1024
> >>>>> -ytm 1024 ....
> >>>>> why in conputer 22,has occupy so much momory?the job is running
> >>>>> computer 79 and computer 69.
> >>>>> What would be the possible causes of such behavior ?
> >>>>> Best Regards,
> >>>>> ----- 原始邮件 -----
> >>>>> 发件人:Daniel Santos > > >
> >>>>> 收件人:user@flink.apache.org
> >>>>> 主题:JVM Non Heap Memory
> >>>>> 日期:2016年11月29日 22点26分
> >>>>>
> >>>>>
> >>>>> Hello,
> >>>>> Is it common to have high usage of Non-Heap in JVM ?
> >>>>> I am running flink in stand-alone cluster and in docker, with each
> >>>>> docker bieng capped at 6G of memory.
> >>>>> I have been struggling to keep memory usage in check.
> >>>>> The non-heap increases to no end. It start with just 100MB of
> >>>>> usage and
> >>>>> after a day it reaches to 1,3GB.
> >>>>> Then evetually reaches to 2GB and then eventually the docker is
> >>>>> killed
> >>>>> because it has reached the memory limit.
> >>>>> My configuration for each flink task manager is the following :
> >>>>> ----------- flink-conf.yaml --------------
> >>>>> taskmanager.heap.mb: 3072
> >>>>> taskmanager.numberOfTaskSlots: 8
> >>>>> taskmanager.memory.preallocate: false
> >>>>> taskmanager.network.numberOfBuffers: 12500
> >>>>> taskmanager.memory.off-heap: false
> >>>>> ---------------------------------------------
> >>>>> What would be the possible causes of such behavior ?
> >>>>> Best Regards,
> >>>>> Daniel Santos
> >>>>>
> >>>>>
> >>>>
> >>>>
> >>
> >
>  
>  

Reply via email to