Hi! My current assumption is that there is an accumulator that cannot be serialized. The SortedStringAccumulator looks fine at a first glance, but are there other accumulators involved? Do you see a message like that one in the log of one of the TaskManagers
"Failed to serialize accumulators for task." with an exception stack trace? Stephan On Fri, Jan 20, 2017 at 2:10 PM, Dave Marion <dlmar...@comcast.net> wrote: > Stephan, > > Thanks for looking at this. Could you elaborate on the misbehavior in the > accumulator? I'd like to fix it if it's incorrect. > > Dave > > > On January 20, 2017 at 4:29 AM Stephan Ewen <se...@apache.org> wrote: > > Hi! > > It seems that the accumulator behaves in a non-standard way, but the > JobManager should also catch that (log a warning or debug message) and > simply continue (not crash). > > I'll try to add a patch that the JobManager tolerates these kinds of > issues in the accumulators. > > Stephan > > > On Thu, Jan 19, 2017 at 7:26 PM, Dave Marion <dlmar...@comcast.net> wrote: > >> Noticed I didn't cc the user list. >> >> ---------- Original Message ---------- >> From: Dave Marion <dlmar...@comcast.net> >> To: Ted Yu <yuzhih...@gmail.com> >> Date: January 19, 2017 at 12:13 PM >> Subject: Re: NPE in JobManager >> >> That might take some time. Here is a hand typed top N lines. If that is >> not enough let me know and I will start the process of getting the full >> stack trace. >> >> >> NullPointerException >> >> at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790) >> >> at JobManager$$updateAccumulators$1.apply(JobManager.scala:1788) >> >> at scala.collection.mutable.ResizableArray$class.forEach(ArrayB >> uffer.scala:48) >> >> at scala.collection.mutable.ArrayBuffer.forEach(ArrayBuffer.scala:48) >> >> at org.apache.flink.runtime.jobmanager.JobManager.org$apache$ >> flink$runtime$jobmanager$JobManager$$updateAccumulators >> (JobManager.scala:1788) >> >> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$ >> handleMessage$1.applyOrElse(JobManager.scala:967) >> >> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF >> unction.scala:36) >> >> at org.apache.flink.runtime.LeaderSessionMassageFilter$$anonfun >> $receive$1.applyOrEslse(LeaderSessionMessageFilter.scala:44) >> >> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF >> unction.scala:36) >> >> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag >> es.scala:33) >> >> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag >> es.scala:28) >> >> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >> >> at org.apache.flink.runtime.LogMesages$$anon$1.applyOrElse( >> LogMessages.scala:28) >> >> >> On January 19, 2017 at 11:58 AM Ted Yu <yuzhih...@gmail.com> wrote: >> >> Can you pastebin the complete stack trace for the NPE ? >> >> Thanks >> >> On Thu, Jan 19, 2017 at 8:57 AM, Dave Marion <dlmar...@comcast.net> >> wrote: >> >> I'm running flink-1.1.4-bin-hadoop27-scala_2.11 and I'm running into an >> issue where after some period of time (measured in 1 - 3 hours) the >> JobManager gets an NPE and shuts itself down. The failure is at >> JobManager$$updateAccumulators$1.apply(JobManager.scala:1790). I'm using >> a custom accumulator[1], but can't tell from the JobManager code whether >> the issue is in my Accumulator, or is a bug in the JobManager. >> >> >> [1] https://github.com/NationalSecurityAgency/timely/blob/master >> /analytics/src/main/java/timely/analytics/flink/SortedS >> tringAccumulator.java >> >> >> > > >