Re: JVM Non Heap Memory

2016-12-05 Thread Chesnay Schepler

Hey Daniel,

the fix won't make it into 1.1.4 since it is only relevant if you're 
using Flink Meters together
with either the Graphite or Ganglia Reporter. The Meter metric is 
however not available in

1.1 at all, so it can't be the underlying cause.

My fix is only for 1.2; the fixed issue could have caused the behavior.

Now, for clarification, the "metrics-meter-tick-thread-X" threads are 
not created by Flink.
With Meter's being out of the picture i thus think this is not an issue 
of Flink's

metric system.

Instead I believe kafka may be the culprit, I found a similar 
description here:

https://issues.apache.org/jira/browse/KAFKA-1521

Which kafka version are you using? Kafka internally also uses the 
DropWizard library,
and a particular version (2.2.0) of that is apparently known to be 
leaking threads.


Regards,
Chesnay

On 05.12.2016 17:30, Ufuk Celebi wrote:

Quick question since the Meter issue does _not_ apply to 1.1.3, which Flink 
metrics are you using?

– Ufuk

On 5 December 2016 at 16:44:47, Daniel Santos (dsan...@cryptolab.net) wrote:

Hello,
  
Thank you all for the kindly reply.
  
I've got the general idea. I am using version flink's 1.1.3.
  
So it seems the fix of Meter's won't make it to 1.1.4 ?
  
Best Regards,
  
Daniel Santos
  
  
On 12/05/2016 01:28 PM, Chesnay Schepler wrote:

We don't have to include it in 1.1.4 since Meter's do not exist in
1.1; my bad for tagging it in JIRA for 1.1.4.

On 05.12.2016 14:18, Ufuk Celebi wrote:

Just to note that the bug mentioned by Chesnay does not invalidate
Stefan's comments. ;-)

Chesnay's issue is here:
https://issues.apache.org/jira/browse/FLINK-5261

I added an issue to improve the documentation about cancellation
(https://issues.apache.org/jira/browse/FLINK-5260).

Which version of Flink are you using? Chesnay's fix will make it into
the upcoming 1.1.4 release.


On 5 December 2016 at 14:04:49, Chesnay Schepler (ches...@apache.org)
wrote:

Hello Daniel,
I'm afraid you stumbled upon a bug in Flink. Meters were not properly
cleaned up, causing the underlying dropwizard meter update threads to
not be shutdown either.
I've opened a JIRA
and will open a PR soon.
Thank your for reporting this issue.
Regards,
Chesnay
On 05.12.2016 12:05, Stefan Richter wrote:

Hi Daniel,

the behaviour you observe looks like some threads are not canceled.
Thread cancelation in Flink (and Java in general) is always
cooperative, where cooperative means that the thread you want to
cancel should somehow check cancelation and react to it. Sometimes
this also requires some effort from the client that wants to cancel a
thread. So if you implement e.g. custom operators or functions with
aerospike, you must ensure that they a) react on cancelation and b)
cleanup their resources. If you do not consider this, your aerospike
client might stay in a blocking call forever, in particular blocking
IO calls are prone to this. What you need to ensure is that
cancelation from the clients includes closing IO resources such as
streams to unblock the thread and allow for termination. This means
that you need your code must (to a certain degree) actively
participate in Flink's task lifecycle. In Flink 1.2 we introduce a
feature called CloseableRegistry, which makes participating in this
lifecycle easier w.r.t. closing resources. For the time being, you
should check that Flink’s task cancelation also causes your code to
close the aerospike client and check cancelation flags.

Best,
Stefan


Am 05.12.2016 um 11:42 schrieb Daniel Santos > >> >:

Hello,

I have done some threads checking and dumps. And I have disabled the
checkpointing.

Here are my findings.

I did a thread dump a few hours after I booted up the whole cluster.
(@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit )

The dump shows that most threads are of 3 sources.
*
**OutputFlusher --- 634 -- Sleeping State*

"OutputFlusher" - Thread t@4758
java.lang.Thread.State: TIMED_WAITING
at java.lang.Thread.sleep(Native Method)
at
org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164)


Locked ownable synchronizers:
- None
*
**Metrics --- 376 ( Flink Metrics Reporter it's the only metrics
being used ) -- Parked State*

"metrics-meter-tick-thread-1" - Thread t@29024
java.lang.Thread.State: TIMED_WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)

at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)

at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)

at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)

at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)

at
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)

at

Re: JVM Non Heap Memory

2016-12-05 Thread Ufuk Celebi
Quick question since the Meter issue does _not_ apply to 1.1.3, which Flink 
metrics are you using?

– Ufuk

On 5 December 2016 at 16:44:47, Daniel Santos (dsan...@cryptolab.net) wrote:
> Hello,
>  
> Thank you all for the kindly reply.
>  
> I've got the general idea. I am using version flink's 1.1.3.
>  
> So it seems the fix of Meter's won't make it to 1.1.4 ?
>  
> Best Regards,
>  
> Daniel Santos
>  
>  
> On 12/05/2016 01:28 PM, Chesnay Schepler wrote:
> > We don't have to include it in 1.1.4 since Meter's do not exist in
> > 1.1; my bad for tagging it in JIRA for 1.1.4.
> >
> > On 05.12.2016 14:18, Ufuk Celebi wrote:
> >> Just to note that the bug mentioned by Chesnay does not invalidate
> >> Stefan's comments. ;-)
> >>
> >> Chesnay's issue is here:
> >> https://issues.apache.org/jira/browse/FLINK-5261
> >>
> >> I added an issue to improve the documentation about cancellation
> >> (https://issues.apache.org/jira/browse/FLINK-5260).
> >>
> >> Which version of Flink are you using? Chesnay's fix will make it into
> >> the upcoming 1.1.4 release.
> >>
> >>
> >> On 5 December 2016 at 14:04:49, Chesnay Schepler (ches...@apache.org)
> >> wrote:
> >>> Hello Daniel,
> >>> I'm afraid you stumbled upon a bug in Flink. Meters were not properly
> >>> cleaned up, causing the underlying dropwizard meter update threads to
> >>> not be shutdown either.
> >>> I've opened a JIRA
> >>> and will open a PR soon.
> >>> Thank your for reporting this issue.
> >>> Regards,
> >>> Chesnay
> >>> On 05.12.2016 12:05, Stefan Richter wrote:
>  Hi Daniel,
> 
>  the behaviour you observe looks like some threads are not canceled.
>  Thread cancelation in Flink (and Java in general) is always
>  cooperative, where cooperative means that the thread you want to
>  cancel should somehow check cancelation and react to it. Sometimes
>  this also requires some effort from the client that wants to cancel a
>  thread. So if you implement e.g. custom operators or functions with
>  aerospike, you must ensure that they a) react on cancelation and b)
>  cleanup their resources. If you do not consider this, your aerospike
>  client might stay in a blocking call forever, in particular blocking
>  IO calls are prone to this. What you need to ensure is that
>  cancelation from the clients includes closing IO resources such as
>  streams to unblock the thread and allow for termination. This means
>  that you need your code must (to a certain degree) actively
>  participate in Flink's task lifecycle. In Flink 1.2 we introduce a
>  feature called CloseableRegistry, which makes participating in this
>  lifecycle easier w.r.t. closing resources. For the time being, you
>  should check that Flink’s task cancelation also causes your code to
>  close the aerospike client and check cancelation flags.
> 
>  Best,
>  Stefan
> 
> > Am 05.12.2016 um 11:42 schrieb Daniel Santos > >> >:
> >
> > Hello,
> >
> > I have done some threads checking and dumps. And I have disabled the
> > checkpointing.
> >
> > Here are my findings.
> >
> > I did a thread dump a few hours after I booted up the whole cluster.
> > (@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit )
> >
> > The dump shows that most threads are of 3 sources.
> > *
> > **OutputFlusher --- 634 -- Sleeping State*
> >
> > "OutputFlusher" - Thread t@4758
> > java.lang.Thread.State: TIMED_WAITING
> > at java.lang.Thread.sleep(Native Method)
> > at
> > org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164)
> >   
> >
> >
> > Locked ownable synchronizers:
> > - None
> > *
> > **Metrics --- 376 ( Flink Metrics Reporter it's the only metrics
> > being used ) -- Parked State*
> >
> > "metrics-meter-tick-thread-1" - Thread t@29024
> > java.lang.Thread.State: TIMED_WAITING
> > at sun.misc.Unsafe.park(Native Method)
> > - parking to wait for (a
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)  
> >
> > at
> > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)  
> >
> > at
> > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> >   
> >
> > at
> > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
> >
> > at
> > java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
> >   
> >
> > at
> > java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
> >   
> >
> > at
> > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
> >   
> >
> > at
> > 

Re: JVM Non Heap Memory

2016-12-05 Thread Daniel Santos

Hello,

Thank you all for the kindly reply.

I've got the general idea. I am using version flink's 1.1.3.

So it seems the fix of Meter's won't make it to 1.1.4 ?

Best Regards,

Daniel Santos


On 12/05/2016 01:28 PM, Chesnay Schepler wrote:
We don't have to include it in 1.1.4 since Meter's do not exist in 
1.1; my bad for tagging it in JIRA for 1.1.4.


On 05.12.2016 14:18, Ufuk Celebi wrote:
Just to note that the bug mentioned by Chesnay does not invalidate 
Stefan's comments. ;-)


Chesnay's issue is here: 
https://issues.apache.org/jira/browse/FLINK-5261


I added an issue to improve the documentation about cancellation 
(https://issues.apache.org/jira/browse/FLINK-5260).


Which version of Flink are you using? Chesnay's fix will make it into 
the upcoming 1.1.4 release.



On 5 December 2016 at 14:04:49, Chesnay Schepler (ches...@apache.org) 
wrote:

Hello Daniel,
  I'm afraid you stumbled upon a bug in Flink. Meters were not properly
cleaned up, causing the underlying dropwizard meter update threads to
not be shutdown either.
  I've opened a JIRA
and will open a PR soon.
  Thank your for reporting this issue.
  Regards,
Chesnay
  On 05.12.2016 12:05, Stefan Richter wrote:

Hi Daniel,

the behaviour you observe looks like some threads are not canceled.
Thread cancelation in Flink (and Java in general) is always
cooperative, where cooperative means that the thread you want to
cancel should somehow check cancelation and react to it. Sometimes
this also requires some effort from the client that wants to cancel a
thread. So if you implement e.g. custom operators or functions with
aerospike, you must ensure that they a) react on cancelation and b)
cleanup their resources. If you do not consider this, your aerospike
client might stay in a blocking call forever, in particular blocking
IO calls are prone to this. What you need to ensure is that
cancelation from the clients includes closing IO resources such as
streams to unblock the thread and allow for termination. This means
that you need your code must (to a certain degree) actively
participate in Flink's task lifecycle. In Flink 1.2 we introduce a
feature called CloseableRegistry, which makes participating in this
lifecycle easier w.r.t. closing resources. For the time being, you
should check that Flink’s task cancelation also causes your code to
close the aerospike client and check cancelation flags.

Best,
Stefan


Am 05.12.2016 um 11:42 schrieb Daniel Santos > >> >:

Hello,

I have done some threads checking and dumps. And I have disabled the
checkpointing.

Here are my findings.

I did a thread dump a few hours after I booted up the whole cluster.
(@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit )

The dump shows that most threads are of 3 sources.
*
**OutputFlusher --- 634 -- Sleeping State*

"OutputFlusher" - Thread t@4758
java.lang.Thread.State: TIMED_WAITING
at java.lang.Thread.sleep(Native Method)
at
org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164) 



Locked ownable synchronizers:
- None
*
**Metrics --- 376 ( Flink Metrics Reporter it's the only metrics
being used ) -- Parked State*

"metrics-meter-tick-thread-1" - Thread t@29024
java.lang.Thread.State: TIMED_WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) 


at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215) 


at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078) 


at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093) 


at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809) 


at
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067) 


at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127) 


at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 


at java.lang.Thread.run(Thread.java:745)

Locked ownable synchronizers:
- None
*
*

*tend -- 220 ( Aerospike Client Thread ) -- Sleeping State
*

"tend" - Thread t@29011
java.lang.Thread.State: TIMED_WAITING
at java.lang.Thread.sleep(Native Method)
at com.aerospike.client.util.Util.sleep(Util.java:38)
at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
at java.lang.Thread.run(Thread.java:745)

Locked ownable synchronizers:
- None


I have 2 streaming jobs and a batch Job that runs once in a while.

Streaming job A runs with a parallel of 2 and runs Aerospike only in
RichSink .

Streaming job B runs with a parallel of 24 and runs Aerospike in
RichFilterFunction / RichMapFunction with open and close methods, in
order to open and close the client.

Batch Job runs Aerospike Client in RichFilterFunction /
RichMapFunction with open and close methods in order to open and
close the client.

Next thing 

Re: JVM Non Heap Memory

2016-12-05 Thread Chesnay Schepler
We don't have to include it in 1.1.4 since Meter's do not exist in 1.1; 
my bad for tagging it in JIRA for 1.1.4.


On 05.12.2016 14:18, Ufuk Celebi wrote:

Just to note that the bug mentioned by Chesnay does not invalidate Stefan's 
comments. ;-)

Chesnay's issue is here: https://issues.apache.org/jira/browse/FLINK-5261

I added an issue to improve the documentation about cancellation 
(https://issues.apache.org/jira/browse/FLINK-5260).

Which version of Flink are you using? Chesnay's fix will make it into the 
upcoming 1.1.4 release.


On 5 December 2016 at 14:04:49, Chesnay Schepler (ches...@apache.org) wrote:

Hello Daniel,
  
I'm afraid you stumbled upon a bug in Flink. Meters were not properly

cleaned up, causing the underlying dropwizard meter update threads to
not be shutdown either.
  
I've opened a JIRA

and will open a PR soon.
  
Thank your for reporting this issue.
  
Regards,

Chesnay
  
On 05.12.2016 12:05, Stefan Richter wrote:

Hi Daniel,

the behaviour you observe looks like some threads are not canceled.
Thread cancelation in Flink (and Java in general) is always
cooperative, where cooperative means that the thread you want to
cancel should somehow check cancelation and react to it. Sometimes
this also requires some effort from the client that wants to cancel a
thread. So if you implement e.g. custom operators or functions with
aerospike, you must ensure that they a) react on cancelation and b)
cleanup their resources. If you do not consider this, your aerospike
client might stay in a blocking call forever, in particular blocking
IO calls are prone to this. What you need to ensure is that
cancelation from the clients includes closing IO resources such as
streams to unblock the thread and allow for termination. This means
that you need your code must (to a certain degree) actively
participate in Flink's task lifecycle. In Flink 1.2 we introduce a
feature called CloseableRegistry, which makes participating in this
lifecycle easier w.r.t. closing resources. For the time being, you
should check that Flink’s task cancelation also causes your code to
close the aerospike client and check cancelation flags.

Best,
Stefan


Am 05.12.2016 um 11:42 schrieb Daniel Santos > >> >:

Hello,

I have done some threads checking and dumps. And I have disabled the
checkpointing.

Here are my findings.

I did a thread dump a few hours after I booted up the whole cluster.
(@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit )

The dump shows that most threads are of 3 sources.
*
**OutputFlusher --- 634 -- Sleeping State*

"OutputFlusher" - Thread t@4758
java.lang.Thread.State: TIMED_WAITING
at java.lang.Thread.sleep(Native Method)
at
org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164)

Locked ownable synchronizers:
- None
*
**Metrics --- 376 ( Flink Metrics Reporter it's the only metrics
being used ) -- Parked State*

"metrics-meter-tick-thread-1" - Thread t@29024
java.lang.Thread.State: TIMED_WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Locked ownable synchronizers:
- None
*
*

*tend -- 220 ( Aerospike Client Thread ) -- Sleeping State
*

"tend" - Thread t@29011
java.lang.Thread.State: TIMED_WAITING
at java.lang.Thread.sleep(Native Method)
at com.aerospike.client.util.Util.sleep(Util.java:38)
at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
at java.lang.Thread.run(Thread.java:745)

Locked ownable synchronizers:
- None


I have 2 streaming jobs and a batch Job that runs once in a while.

Streaming job A runs with a parallel of 2 and runs Aerospike only in
RichSink .

Streaming job B runs with a parallel of 24 and runs Aerospike in
RichFilterFunction / RichMapFunction with open and close methods, in
order to open and close the client.

Batch Job runs Aerospike Client in RichFilterFunction /
RichMapFunction with open and close methods in order to open and
close the client.

Next thing I cancelled all the streaming jobs @5/12/2016 and checked
the threads and the JVM non-heap usage.

JVM non-heap usage reaches 3GB, threads go down, but some still
linger around and they are the following.

*Metrics --- 790 ( Flink Metrics Reporter it's the only 

Re: JVM Non Heap Memory

2016-12-05 Thread Ufuk Celebi
Just to note that the bug mentioned by Chesnay does not invalidate Stefan's 
comments. ;-)

Chesnay's issue is here: https://issues.apache.org/jira/browse/FLINK-5261

I added an issue to improve the documentation about cancellation 
(https://issues.apache.org/jira/browse/FLINK-5260).

Which version of Flink are you using? Chesnay's fix will make it into the 
upcoming 1.1.4 release.


On 5 December 2016 at 14:04:49, Chesnay Schepler (ches...@apache.org) wrote:
> Hello Daniel,
>  
> I'm afraid you stumbled upon a bug in Flink. Meters were not properly
> cleaned up, causing the underlying dropwizard meter update threads to
> not be shutdown either.
>  
> I've opened a JIRA  
> and will open a PR soon.
>  
> Thank your for reporting this issue.
>  
> Regards,
> Chesnay
>  
> On 05.12.2016 12:05, Stefan Richter wrote:
> > Hi Daniel,
> >
> > the behaviour you observe looks like some threads are not canceled.
> > Thread cancelation in Flink (and Java in general) is always
> > cooperative, where cooperative means that the thread you want to
> > cancel should somehow check cancelation and react to it. Sometimes
> > this also requires some effort from the client that wants to cancel a
> > thread. So if you implement e.g. custom operators or functions with
> > aerospike, you must ensure that they a) react on cancelation and b)
> > cleanup their resources. If you do not consider this, your aerospike
> > client might stay in a blocking call forever, in particular blocking
> > IO calls are prone to this. What you need to ensure is that
> > cancelation from the clients includes closing IO resources such as
> > streams to unblock the thread and allow for termination. This means
> > that you need your code must (to a certain degree) actively
> > participate in Flink's task lifecycle. In Flink 1.2 we introduce a
> > feature called CloseableRegistry, which makes participating in this
> > lifecycle easier w.r.t. closing resources. For the time being, you
> > should check that Flink’s task cancelation also causes your code to
> > close the aerospike client and check cancelation flags.
> >
> > Best,
> > Stefan
> >
> >> Am 05.12.2016 um 11:42 schrieb Daniel Santos > >> >:
> >>
> >> Hello,
> >>
> >> I have done some threads checking and dumps. And I have disabled the
> >> checkpointing.
> >>
> >> Here are my findings.
> >>
> >> I did a thread dump a few hours after I booted up the whole cluster.
> >> (@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit )
> >>
> >> The dump shows that most threads are of 3 sources.
> >> *
> >> **OutputFlusher --- 634 -- Sleeping State*
> >>
> >> "OutputFlusher" - Thread t@4758
> >> java.lang.Thread.State: TIMED_WAITING
> >> at java.lang.Thread.sleep(Native Method)
> >> at
> >> org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164)
> >>   
> >>
> >> Locked ownable synchronizers:
> >> - None
> >> *
> >> **Metrics --- 376 ( Flink Metrics Reporter it's the only metrics
> >> being used ) -- Parked State*
> >>
> >> "metrics-meter-tick-thread-1" - Thread t@29024
> >> java.lang.Thread.State: TIMED_WAITING
> >> at sun.misc.Unsafe.park(Native Method)
> >> - parking to wait for (a
> >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)  
> >> at
> >> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)  
> >> at
> >> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> >>   
> >> at
> >> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
> >> at
> >> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
> >>   
> >> at
> >> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
> >>   
> >> at
> >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
> >>   
> >> at
> >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> >>   
> >> at java.lang.Thread.run(Thread.java:745)
> >>
> >> Locked ownable synchronizers:
> >> - None
> >> *
> >> *
> >>
> >> *tend -- 220 ( Aerospike Client Thread ) -- Sleeping State
> >> *
> >>
> >> "tend" - Thread t@29011
> >> java.lang.Thread.State: TIMED_WAITING
> >> at java.lang.Thread.sleep(Native Method)
> >> at com.aerospike.client.util.Util.sleep(Util.java:38)
> >> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
> >> at java.lang.Thread.run(Thread.java:745)
> >>
> >> Locked ownable synchronizers:
> >> - None
> >>
> >>
> >> I have 2 streaming jobs and a batch Job that runs once in a while.
> >>
> >> Streaming job A runs with a parallel of 2 and runs Aerospike only in
> >> RichSink .
> >>
> >> Streaming job B runs with a parallel of 24 and runs Aerospike in
> >> RichFilterFunction / RichMapFunction with open and close methods, in
> >> order to open and close the client.
> >>
> >> Batch Job runs Aerospike Client in 

Re: JVM Non Heap Memory

2016-12-05 Thread Chesnay Schepler

Hello Daniel,

I'm afraid you stumbled upon a bug in Flink. Meters were not properly 
cleaned up, causing the underlying dropwizard meter update threads to 
not be shutdown either.


I've opened a JIRA  
and will open a PR soon.


Thank your for reporting this issue.

Regards,
Chesnay

On 05.12.2016 12:05, Stefan Richter wrote:

Hi Daniel,

the behaviour you observe looks like some threads are not canceled. 
Thread cancelation in Flink (and Java in general) is always 
cooperative, where cooperative means that the thread you want to 
cancel should somehow check cancelation and react to it. Sometimes 
this also requires some effort from the client that wants to cancel a 
thread. So if you implement e.g. custom operators or functions with 
aerospike, you must ensure that they a) react on cancelation and b) 
cleanup their resources. If you do not consider this, your aerospike 
client might stay in a blocking call forever, in particular blocking 
IO calls are prone to this. What you need to ensure is that 
cancelation from the clients includes closing IO resources such as 
streams to unblock the thread and allow for termination. This means 
that you need your code must (to a certain degree) actively 
participate in Flink's task lifecycle. In Flink 1.2 we introduce a 
feature called CloseableRegistry, which makes participating in this 
lifecycle easier w.r.t. closing resources. For the time being, you 
should check that Flink’s task cancelation also causes your code to 
close the aerospike client and check cancelation flags.


Best,
Stefan

Am 05.12.2016 um 11:42 schrieb Daniel Santos >:


Hello,

I have done some threads checking and dumps. And I have disabled the 
checkpointing.


Here are my findings.

I did a thread dump a few hours after I booted up the whole cluster. 
(@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit )


The dump shows that most threads are of 3 sources.
*
**OutputFlusher --- 634 -- Sleeping State*

"OutputFlusher" - Thread t@4758
   java.lang.Thread.State: TIMED_WAITING
at java.lang.Thread.sleep(Native Method)
at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164)


   Locked ownable synchronizers:
- None
*
**Metrics --- 376 ( Flink Metrics Reporter it's the only metrics 
being used ) -- Parked State*


"metrics-meter-tick-thread-1" - Thread t@29024
   java.lang.Thread.State: TIMED_WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
- None
*
*

*tend -- 220 ( Aerospike Client Thread ) -- Sleeping State
*

 "tend" - Thread t@29011
   java.lang.Thread.State: TIMED_WAITING
at java.lang.Thread.sleep(Native Method)
at com.aerospike.client.util.Util.sleep(Util.java:38)
at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
- None


I have 2 streaming jobs and a batch Job that runs once in a while.

Streaming job A runs with a parallel of 2 and runs Aerospike only in 
RichSink .


Streaming job B runs with a parallel of 24 and runs Aerospike in 
RichFilterFunction / RichMapFunction with open and close methods, in 
order to open and close the client.


Batch Job runs Aerospike Client in RichFilterFunction / 
RichMapFunction with open and close methods in order to open and 
close the client.


Next thing I cancelled all the streaming jobs @5/12/2016 and checked 
the threads and the JVM non-heap usage.


JVM non-heap usage reaches 3GB, threads go down, but some still 
linger around and they are the following.


*Metrics --- 790 ( Flink Metrics Reporter it's the only metrics being 
used ) *


"metrics-meter-tick-thread-1" - Thread t@29024
   java.lang.Thread.State: TIMED_WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at 

Re: JVM Non Heap Memory

2016-12-05 Thread Stefan Richter
Hi Daniel,

the behaviour you observe looks like some threads are not canceled. Thread 
cancelation in Flink (and Java in general) is always cooperative, where 
cooperative means that the thread you want to cancel should somehow check 
cancelation and react to it. Sometimes this also requires some effort from the 
client that wants to cancel a thread. So if you implement e.g. custom operators 
or functions with aerospike, you must ensure that they a) react on cancelation 
and b) cleanup their resources. If you do not consider this, your aerospike 
client might stay in a blocking call forever, in particular blocking IO calls 
are prone to this. What you need to ensure is that cancelation from the clients 
includes closing IO resources such as streams to unblock the thread and allow 
for termination. This means that you need your code must (to a certain degree) 
actively participate in Flink's task lifecycle. In Flink 1.2 we introduce a 
feature called CloseableRegistry, which makes participating in this lifecycle 
easier w.r.t. closing resources. For the time being, you should check that 
Flink’s task cancelation also causes your code to close the aerospike client 
and check cancelation flags.

Best,
Stefan

> Am 05.12.2016 um 11:42 schrieb Daniel Santos :
> 
> Hello,
> 
> I have done some threads checking and dumps. And I have disabled the 
> checkpointing.
> Here are my findings. 
> I did a thread dump a few hours after I booted up the whole cluster. 
> (@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit )
> 
> The dump shows that most threads are of 3 sources.
> 
> OutputFlusher --- 634 -- Sleeping State
> 
> "OutputFlusher" - Thread t@4758
>java.lang.Thread.State: TIMED_WAITING
> at java.lang.Thread.sleep(Native Method)
> at 
> org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164)
> 
>Locked ownable synchronizers:
> - None
> 
> Metrics --- 376 ( Flink Metrics Reporter it's the only metrics being used ) 
> -- Parked State
> 
> "metrics-meter-tick-thread-1" - Thread t@29024
>java.lang.Thread.State: TIMED_WAITING
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
> at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
> at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 
>Locked ownable synchronizers:
> - None
> 
> tend -- 220 ( Aerospike Client Thread ) -- Sleeping State
> 
>  "tend" - Thread t@29011
>java.lang.Thread.State: TIMED_WAITING
> at java.lang.Thread.sleep(Native Method)
> at com.aerospike.client.util.Util.sleep(Util.java:38)
> at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
> at java.lang.Thread.run(Thread.java:745)
> 
>Locked ownable synchronizers:
> - None
> 
> I have 2 streaming jobs and a batch Job that runs once in a while.
> 
> Streaming job A runs with a parallel of 2 and runs Aerospike only in RichSink 
> .
> 
> Streaming job B runs with a parallel of 24 and runs Aerospike in 
> RichFilterFunction / RichMapFunction with open and close methods, in order to 
> open and close the client.
> 
> Batch Job runs Aerospike Client in RichFilterFunction / RichMapFunction with 
> open and close methods in order to open and close the client.
> 
> Next thing I cancelled all the streaming jobs @5/12/2016 and checked the 
> threads and the JVM non-heap usage.
> 
> JVM non-heap usage reaches 3GB, threads go down, but some still linger around 
> and they are the following.
> 
> Metrics --- 790 ( Flink Metrics Reporter it's the only metrics being used ) 
> 
> "metrics-meter-tick-thread-1" - Thread t@29024
>java.lang.Thread.State: TIMED_WAITING
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for  (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
> at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
> at 
> 

Re: JVM Non Heap Memory

2016-12-05 Thread Daniel Santos

Hello,

I have done some threads checking and dumps. And I have disabled the 
checkpointing.


Here are my findings.

I did a thread dump a few hours after I booted up the whole cluster. 
(@2/12/2016; 5 TM ; 3GB HEAP each ; 7GB total each as Limit )


The dump shows that most threads are of 3 sources.
*
**OutputFlusher --- 634 -- Sleeping State*

"OutputFlusher" - Thread t@4758
   java.lang.Thread.State: TIMED_WAITING
at java.lang.Thread.sleep(Native Method)
at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:164)


   Locked ownable synchronizers:
- None
*
**Metrics --- 376 ( Flink Metrics Reporter it's the only metrics being 
used ) -- Parked State*


"metrics-meter-tick-thread-1" - Thread t@29024
   java.lang.Thread.State: TIMED_WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
- None
*
*

*tend -- 220 ( Aerospike Client Thread ) -- Sleeping State
*

 "tend" - Thread t@29011
   java.lang.Thread.State: TIMED_WAITING
at java.lang.Thread.sleep(Native Method)
at com.aerospike.client.util.Util.sleep(Util.java:38)
at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
- None


I have 2 streaming jobs and a batch Job that runs once in a while.

Streaming job A runs with a parallel of 2 and runs Aerospike only in 
RichSink .


Streaming job B runs with a parallel of 24 and runs Aerospike in 
RichFilterFunction / RichMapFunction with open and close methods, in 
order to open and close the client.


Batch Job runs Aerospike Client in RichFilterFunction / RichMapFunction 
with open and close methods in order to open and close the client.


Next thing I cancelled all the streaming jobs @5/12/2016 and checked the 
threads and the JVM non-heap usage.


JVM non-heap usage reaches 3GB, threads go down, but some still linger 
around and they are the following.


*Metrics --- 790 ( Flink Metrics Reporter it's the only metrics being 
used ) *


"metrics-meter-tick-thread-1" - Thread t@29024
   java.lang.Thread.State: TIMED_WAITING
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  (a 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at 
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
at 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
at 
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
- None

*
*

*tend -- 432**( Aerospike Client Thread )*


 "tend" - Thread t@29011
   java.lang.Thread.State: TIMED_WAITING
at java.lang.Thread.sleep(Native Method)
at com.aerospike.client.util.Util.sleep(Util.java:38)
at com.aerospike.client.cluster.Cluster.run(Cluster.java:262)
at java.lang.Thread.run(Thread.java:745)

   Locked ownable synchronizers:
- None


Total number threads are 1289 ( total ) / 1220 ( tend + metrics ) . So I 
have 1220 threads that I believe that sould be dead and not running, 
since I have no jobs running at all.


And the JVM Non-HEAP usage doesn't decreases at all, after removing 
every job.



Why the hell metrics grow to no end ?

I am using the following libs for metrics :

- metrics-graphite-3.1.0.jar

- metrics-core-3.1.0.jar

- flink-metrics-dropwizard-1.1.3.jar

- flink-metrics-graphite-1.1.3.jar

And the config for reporter is :

metrics.reporters: 

Re: JVM Non Heap Memory

2016-11-29 Thread Ufuk Celebi
Hey Daniel!

Thanks for reporting this. Unbounded growth of non-heap memory is not expected. 
 What kind of Threads are you seeing being spawned/lingering around?

As a first step, could you try to disable checkpointing and see how it behaves 
afterwards?

– Ufuk

On 29 November 2016 at 17:32:32, Daniel Santos (dsan...@cryptolab.net) wrote:
> Hello,
>  
> Nope I am using Hadoop HDFS, as state backend, Kafka, as source, and a
> HttpClient as a Sink, also Kafka as Sink.
> So it's possible that the state backend is the culprit?
>  
> Curious thing is even when no jobs are running streaming or otherwise,
> the JVM Non-HEAP stays the same.
> Which I find it odd.
>  
> Another curious thing is that it's proportional to an increase of JVM
> thread's number.
> Whenever there are more JVM threads running there is also more JVM
> Non-HEAP being used, which makes sense.
> But threads stick around never decreasing, too, likewise JVM Non-HEAP
> memory.
>  
> These observations described are based on what flink's metrics are being
> sent and recorded to our graphite's system.
>  
> Best Regards,
>  
> Daniel Santos
>  
> On 11/29/2016 04:04 PM, Cliff Resnick wrote:
> > Are you using the RocksDB backend in native mode? If so then the
> > off-heap memory may be there.
> >
> > On Tue, Nov 29, 2016 at 9:54 AM, > > > wrote:
> >
> > i have the same problem,but i put the flink job into yarn.
> > but i put the job into yarn on the computer 22,and the job can
> > success run,and the jobmanager is 79 and taskmanager is 69,they
> > three different compu345ter,
> > however,on computer 22,the pid=3463,which is the job that put into
> > yarn,is have 2.3g memory,15% of total,
> > the commend is : ./flink run -m yarn-cluster -yn 1 -ys 1 -yjm 1024
> > -ytm 1024 
> > why in conputer 22,has occupy so much momory?the job is running
> > computer 79 and computer 69.
> > What would be the possible causes of such behavior ?
> > Best Regards,
> > - 原始邮件 -
> > 发件人:Daniel Santos > > >
> > 收件人:user@flink.apache.org  
> > 主题:JVM Non Heap Memory
> > 日期:2016年11月29日 22点26分
> >
> >
> > Hello,
> > Is it common to have high usage of Non-Heap in JVM ?
> > I am running flink in stand-alone cluster and in docker, with each
> > docker bieng capped at 6G of memory.
> > I have been struggling to keep memory usage in check.
> > The non-heap increases to no end. It start with just 100MB of
> > usage and
> > after a day it reaches to 1,3GB.
> > Then evetually reaches to 2GB and then eventually the docker is
> > killed
> > because it has reached the memory limit.
> > My configuration for each flink task manager is the following :
> > --- flink-conf.yaml --
> > taskmanager.heap.mb: 3072
> > taskmanager.numberOfTaskSlots: 8
> > taskmanager.memory.preallocate: false
> > taskmanager.network.numberOfBuffers: 12500
> > taskmanager.memory.off-heap: false
> > -
> > What would be the possible causes of such behavior ?
> > Best Regards,
> > Daniel Santos
> >
> >
>  
>  



Re: JVM Non Heap Memory

2016-11-29 Thread Daniel Santos

Hello,

Nope I am using Hadoop HDFS, as state backend, Kafka, as source, and a 
HttpClient as a Sink, also Kafka as Sink.

So it's possible that the state backend is the culprit?

Curious thing is even when no jobs are running streaming or otherwise, 
the JVM Non-HEAP stays the same.

Which I find it odd.

Another curious thing is that it's proportional to an increase of JVM 
thread's number.
Whenever there are more JVM threads running there is also more JVM 
Non-HEAP being used, which makes sense.
But threads stick around never decreasing, too, likewise JVM Non-HEAP 
memory.


These observations described are based on what flink's metrics are being 
sent and recorded to our graphite's system.


Best Regards,

Daniel Santos

On 11/29/2016 04:04 PM, Cliff Resnick wrote:
Are you using the RocksDB backend in native mode? If so then the 
off-heap memory may be there.


On Tue, Nov 29, 2016 at 9:54 AM, > wrote:


i have the same problem,but i put the flink job into yarn.
but i put the job into yarn on the computer 22,and the job can
success run,and the jobmanager is 79 and taskmanager is 69,they
three different compu345ter,
however,on computer 22,the pid=3463,which is the job that put into
yarn,is have 2.3g memory,15% of total,
the commend is : ./flink run -m yarn-cluster -yn 1 -ys 1 -yjm 1024
-ytm 1024 
why in conputer 22,has occupy so much momory?the job is running
computer 79 and computer 69.
What would be the possible causes of such behavior ?
Best Regards,
- 原始邮件 -
发件人:Daniel Santos >
收件人:user@flink.apache.org 
主题:JVM Non Heap Memory
日期:2016年11月29日 22点26分


Hello,
Is it common to have high usage of Non-Heap in JVM ?
I am running flink in stand-alone cluster and in docker, with each
docker bieng capped at 6G of memory.
I have been struggling to keep memory usage in check.
The non-heap increases to no end. It start with just 100MB of
usage and
after a day it reaches to 1,3GB.
Then evetually reaches to 2GB and then eventually the docker is
killed
because it has reached the memory limit.
My configuration for each flink task manager is the following :
--- flink-conf.yaml --
taskmanager.heap.mb: 3072
taskmanager.numberOfTaskSlots: 8
taskmanager.memory.preallocate: false
taskmanager.network.numberOfBuffers: 12500
taskmanager.memory.off-heap: false
-
What would be the possible causes of such behavior ?
Best Regards,
Daniel Santos