Re: Re: NPE in JobManager

2017-01-20 Thread Dave Marion
Fixing my accumulator did the trick. I should note that the JobManager did not 
fail when I ran this previously against Flink 1.1.3. Thanks for the help!

Dave


> On January 20, 2017 at 8:45 AM Dave Marion <dlmar...@comcast.net> wrote:
> 
> I do see that message in one of the task manager logs 20ms before the NPE 
> in the JobManager. Looking in that log, there is a 
> ConcurrentModificationException in TreeMap, which my accumulator uses. I'll 
> track this down, thanks for the pointer.
> 
> 
> > > On January 20, 2017 at 8:27 AM Stephan Ewen <se...@apache.org> 
> wrote:
> > 
> > Hi!
> > 
> > My current assumption is that there is an accumulator that cannot 
> > be serialized. The SortedStringAccumulator looks fine at a first glance, 
> > but are there other accumulators involved?
> > Do you see a message like that one in the log of one of the 
> > TaskManagers
> > 
> > "Failed to serialize accumulators for task."
> > 
> > with an exception stack trace?
> > 
> > 
> > Stephan
> > 
> > 
> > 
> > On Fri, Jan 20, 2017 at 2:10 PM, Dave Marion <dlmar...@comcast.net 
> > mailto:dlmar...@comcast.net > wrote:
> > 
> > > > > Stephan,
> > > 
> > > Thanks for looking at this. Could you elaborate on the 
> > > misbehavior in the accumulator? I'd like to fix it if it's incorrect.
> > > 
> > > Dave
> > > 
> > > 
> > > 
> > > > > > > On January 20, 2017 at 4:29 AM Stephan Ewen 
> > > <se...@apache.org mailto:se...@apache.org > wrote:
> > > > 
> > > > Hi!
> > > > 
> > > > It seems that the accumulator behaves in a non-standard 
> > > > way, but the JobManager should also catch that (log a warning or debug 
> > > > message) and simply continue (not crash).
> > > > 
> > > > I'll try to add a patch that the JobManager tolerates 
> > > > these kinds of issues in the accumulators.
> > > > 
> > > > Stephan
> > > > 
> > > > 
> > > > On Thu, Jan 19, 2017 at 7:26 PM, Dave Marion 
> > > > <dlmar...@comcast.net mailto:dlmar...@comcast.net > wrote:
> > > > 
> > > > > > > > > 
> > > > > Noticed I didn't cc the user list.
> > > > > 
> > > > > > > > >  Original Message --
> > > > From: Dave Marion <dlmar...@comcast.net 
> > > > mailto:dlmar...@comcast.net >
> > > > To: Ted Yu <yuzhih...@gmail.com 
> > > > mailto:yuzhih...@gmail.com >
> > > > Date: January 19, 2017 at 12:13 PM
> > > > Subject: Re: NPE in JobManager
> > > > 
> > > > 
> > > > That might take some time. Here is a hand typed 
> > > > top N lines. If that is not enough let me know and I will start the 
> > > > process of getting the full stack trace.
> > > > 
> > > > 
> > > > NullPointerException
> > > > 
> > > > at 
> > > > JobManager$$updateAccumulators$1.apply(JobManager.scala:1790)
> > > > 
> > > > at 
> > > > JobManager$$updateAccumulators$1.apply(JobManager.scala:1788)
> > > > 
> > > > at 
> > > > scala.collection.mutable.ResizableArray$class.forEach(ArrayBuffer.scala:48)
> > > > 
> > > > at 
> > > > scala.collection.mutable.ArrayBuffer.forEach(ArrayBuffer.scala:48)
> > > > 
> > > > at 
> > > > org.apache.flink.runtime.jobmanager.JobManager.org 
> > > > http://org.apache.flink.runtime.jobmanager.JobManager.org 
> > > > $apache$flink$runtime$jobmanager$JobManager$$updateAccumulators(JobManager.scala:1788)
> > > > 
> > > > at 
> > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:967)
> > > > 
> > > > at 
&

Re: Re: NPE in JobManager

2017-01-20 Thread Dave Marion
I do see that message in one of the task manager logs 20ms before the NPE in 
the JobManager. Looking in that log, there is a ConcurrentModificationException 
in TreeMap, which my accumulator uses. I'll track this down, thanks for the 
pointer.


> On January 20, 2017 at 8:27 AM Stephan Ewen <se...@apache.org> wrote:
> 
> Hi!
> 
> My current assumption is that there is an accumulator that cannot be 
> serialized. The SortedStringAccumulator looks fine at a first glance, but are 
> there other accumulators involved?
> Do you see a message like that one in the log of one of the TaskManagers
> 
> "Failed to serialize accumulators for task."
> 
> with an exception stack trace?
> 
> 
> Stephan
> 
> 
> 
> On Fri, Jan 20, 2017 at 2:10 PM, Dave Marion <dlmar...@comcast.net 
> mailto:dlmar...@comcast.net > wrote:
> 
> > > Stephan,
> > 
> > Thanks for looking at this. Could you elaborate on the misbehavior 
> > in the accumulator? I'd like to fix it if it's incorrect.
> > 
> > Dave
> > 
> > 
> > 
> > > > > On January 20, 2017 at 4:29 AM Stephan Ewen 
> > <se...@apache.org mailto:se...@apache.org > wrote:
> > > 
> > > Hi!
> > > 
> > > It seems that the accumulator behaves in a non-standard way, 
> > > but the JobManager should also catch that (log a warning or debug 
> > > message) and simply continue (not crash).
> > > 
> > > I'll try to add a patch that the JobManager tolerates these 
> > > kinds of issues in the accumulators.
> > > 
> > > Stephan
> > > 
> > > 
> > > On Thu, Jan 19, 2017 at 7:26 PM, Dave Marion 
> > > <dlmar...@comcast.net mailto:dlmar...@comcast.net > wrote:
> > > 
> > > > > > > 
> > > > Noticed I didn't cc the user list.
> > > > 
> > > > > > >  Original Message --
> > > From: Dave Marion <dlmar...@comcast.net 
> > > mailto:dlmar...@comcast.net >
> > > To: Ted Yu <yuzhih...@gmail.com 
> > > mailto:yuzhih...@gmail.com >
> > > Date: January 19, 2017 at 12:13 PM
> > > Subject: Re: NPE in JobManager
> > > 
> > > 
> > > That might take some time. Here is a hand typed top N 
> > > lines. If that is not enough let me know and I will start the process of 
> > > getting the full stack trace.
> > > 
> > > 
> > > NullPointerException
> > > 
> > > at 
> > > JobManager$$updateAccumulators$1.apply(JobManager.scala:1790)
> > > 
> > > at 
> > > JobManager$$updateAccumulators$1.apply(JobManager.scala:1788)
> > > 
> > > at 
> > > scala.collection.mutable.ResizableArray$class.forEach(ArrayBuffer.scala:48)
> > > 
> > > at 
> > > scala.collection.mutable.ArrayBuffer.forEach(ArrayBuffer.scala:48)
> > > 
> > > at org.apache.flink.runtime.jobmanager.JobManager.org 
> > > http://org.apache.flink.runtime.jobmanager.JobManager.org 
> > > $apache$flink$runtime$jobmanager$JobManager$$updateAccumulators(JobManager.scala:1788)
> > > 
> > > at 
> > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:967)
> > > 
> > > at 
> > > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> > > 
> > > at 
> > > org.apache.flink.runtime.LeaderSessionMassageFilter$$anonfun$receive$1.applyOrEslse(LeaderSessionMessageFilter.scala:44)
> > > 
> > > at 
> > > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> > > 
> > > at 
> > > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> > > 
> > > at 
> > > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> > > 
> > > at 
> > > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> > > 
> > > a

Re: Re: NPE in JobManager

2017-01-20 Thread Dave Marion
Stephan,

Thanks for looking at this. Could you elaborate on the misbehavior in the 
accumulator? I'd like to fix it if it's incorrect.

Dave


> On January 20, 2017 at 4:29 AM Stephan Ewen <se...@apache.org> wrote:
> 
> Hi!
> 
> It seems that the accumulator behaves in a non-standard way, but the 
> JobManager should also catch that (log a warning or debug message) and simply 
> continue (not crash).
> 
> I'll try to add a patch that the JobManager tolerates these kinds of 
> issues in the accumulators.
> 
> Stephan
> 
> 
> On Thu, Jan 19, 2017 at 7:26 PM, Dave Marion <dlmar...@comcast.net 
> mailto:dlmar...@comcast.net > wrote:
> 
> > > 
> > Noticed I didn't cc the user list.
> > 
> > >  Original Message --
> From: Dave Marion <dlmar...@comcast.net 
> mailto:dlmar...@comcast.net >
> To: Ted Yu <yuzhih...@gmail.com mailto:yuzhih...@gmail.com >
> Date: January 19, 2017 at 12:13 PM
> Subject: Re: NPE in JobManager
> 
> 
> That might take some time. Here is a hand typed top N lines. If 
> that is not enough let me know and I will start the process of getting the 
> full stack trace.
> 
> 
> NullPointerException
> 
> at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790)
> 
> at JobManager$$updateAccumulators$1.apply(JobManager.scala:1788)
> 
> at 
> scala.collection.mutable.ResizableArray$class.forEach(ArrayBuffer.scala:48)
> 
> at 
> scala.collection.mutable.ArrayBuffer.forEach(ArrayBuffer.scala:48)
> 
> at org.apache.flink.runtime.jobmanager.JobManager.org 
> http://org.apache.flink.runtime.jobmanager.JobManager.org 
> $apache$flink$runtime$jobmanager$JobManager$$updateAccumulators(JobManager.scala:1788)
> 
> at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:967)
> 
> at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> 
> at 
> org.apache.flink.runtime.LeaderSessionMassageFilter$$anonfun$receive$1.applyOrEslse(LeaderSessionMessageFilter.scala:44)
> 
> at 
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> 
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
> 
> at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
> 
> at 
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> 
> at 
> org.apache.flink.runtime.LogMesages$$anon$1.applyOrElse(LogMessages.scala:28)
> 
> 
> > > On January 19, 2017 at 11:58 AM Ted Yu 
> <yuzhih...@gmail.com mailto:yuzhih...@gmail.com > wrote:
> > 
> > Can you pastebin the complete stack trace for the NPE ?
> > 
> > Thanks
> > 
> > On Thu, Jan 19, 2017 at 8:57 AM, Dave Marion 
> > <dlmar...@comcast.net mailto:dlmar...@comcast.net > wrote:
> > 
> > > > > 
> > > I'm running flink-1.1.4-bin-hadoop27-scala_2.11 and 
> > > I'm running into an issue where after some period of time (measured in 1 
> > > - 3 hours) the JobManager gets an NPE and shuts itself down. The failure 
> > > is at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790). I'm 
> > > using a custom accumulator[1], but can't tell from the JobManager code 
> > > whether the issue is in my Accumulator, or is a bug in the JobManager.
> > > 
> > > 
> > > [1] 
> > > https://github.com/NationalSecurityAgency/timely/blob/master/analytics/src/main/java/timely/analytics/flink/SortedStringAccumulator.java
> > >  
> > > https://github.com/NationalSecurityAgency/timely/blob/master/analytics/src/main/java/timely/analytics/flink/SortedStringAccumulator.java
> > > 
> > > > > 
> > 
> > > 
> 


 


Fwd: Re: NPE in JobManager

2017-01-19 Thread Dave Marion
Noticed I didn't cc the user list.

 Original Message --
From: Dave Marion <dlmar...@comcast.net>
To: Ted Yu <yuzhih...@gmail.com>
Date: January 19, 2017 at 12:13 PM
Subject: Re: NPE in JobManager


That might take some time. Here is a hand typed top N lines. If that is not 
enough let me know and I will start the process of getting the full stack trace.


NullPointerException

at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790)

at JobManager$$updateAccumulators$1.apply(JobManager.scala:1788)

at scala.collection.mutable.ResizableArray$class.forEach(ArrayBuffer.scala:48)

at scala.collection.mutable.ArrayBuffer.forEach(ArrayBuffer.scala:48)

at 
org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$updateAccumulators(JobManager.scala:1788)

at 
org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:967)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at 
org.apache.flink.runtime.LeaderSessionMassageFilter$$anonfun$receive$1.applyOrEslse(LeaderSessionMessageFilter.scala:44)

at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)

at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at org.apache.flink.runtime.LogMesages$$anon$1.applyOrElse(LogMessages.scala:28)


On January 19, 2017 at 11:58 AM Ted Yu <yuzhih...@gmail.com> wrote:

Can you pastebin the complete stack trace for the NPE ?

Thanks

On Thu, Jan 19, 2017 at 8:57 AM, Dave Marion <dlmar...@comcast.net 
mailto:dlmar...@comcast.net > wrote:

> 
> I'm running flink-1.1.4-bin-hadoop27-scala_2.11 and I'm running 
> into an issue where after some period of time (measured in 1 - 3 hours) the 
> JobManager gets an NPE and shuts itself down. The failure is at 
> JobManager$$updateAccumulators$1.apply(JobManager.scala:1790). I'm using a 
> custom accumulator[1], but can't tell from the JobManager code whether the 
> issue is in my Accumulator, or is a bug in the JobManager.
> 
> 
> [1] 
> https://github.com/NationalSecurityAgency/timely/blob/master/analytics/src/main/java/timely/analytics/flink/SortedStringAccumulator.java
>  
> https://github.com/NationalSecurityAgency/timely/blob/master/analytics/src/main/java/timely/analytics/flink/SortedStringAccumulator.java
> 
> 




NPE in JobManager

2017-01-19 Thread Dave Marion
I'm running flink-1.1.4-bin-hadoop27-scala_2.11 and I'm running into an issue 
where after some period of time (measured in 1 - 3 hours) the JobManager gets 
an NPE and shuts itself down. The failure is at 
JobManager$$updateAccumulators$1.apply(JobManager.scala:1790). I'm using a 
custom accumulator[1], but can't tell from the JobManager code whether the 
issue is in my Accumulator, or is a bug in the JobManager.


[1] 
https://github.com/NationalSecurityAgency/timely/blob/master/analytics/src/main/java/timely/analytics/flink/SortedStringAccumulator.java