Hi!

My current assumption is that there is an accumulator that cannot be
serialized. The SortedStringAccumulator looks fine at a first glance, but
are there other accumulators involved?
Do you see a message like that one in the log of one of the TaskManagers

"Failed to serialize accumulators for task."

with an exception stack trace?


Stephan



On Fri, Jan 20, 2017 at 2:10 PM, Dave Marion <dlmar...@comcast.net> wrote:

> Stephan,
>
> Thanks for looking at this. Could you elaborate on the misbehavior in the
> accumulator? I'd like to fix it if it's incorrect.
>
> Dave
>
>
> On January 20, 2017 at 4:29 AM Stephan Ewen <se...@apache.org> wrote:
>
> Hi!
>
> It seems that the accumulator behaves in a non-standard way, but the
> JobManager should also catch that (log a warning or debug message) and
> simply continue (not crash).
>
> I'll try to add a patch that the JobManager tolerates these kinds of
> issues in the accumulators.
>
> Stephan
>
>
> On Thu, Jan 19, 2017 at 7:26 PM, Dave Marion <dlmar...@comcast.net> wrote:
>
>> Noticed I didn't cc the user list.
>>
>> ---------- Original Message ----------
>> From: Dave Marion <dlmar...@comcast.net>
>> To: Ted Yu <yuzhih...@gmail.com>
>> Date: January 19, 2017 at 12:13 PM
>> Subject: Re: NPE in JobManager
>>
>> That might take some time. Here is a hand typed top N lines. If that is
>> not enough let me know and I will start the process of getting the full
>> stack trace.
>>
>>
>> NullPointerException
>>
>> at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790)
>>
>> at JobManager$$updateAccumulators$1.apply(JobManager.scala:1788)
>>
>> at scala.collection.mutable.ResizableArray$class.forEach(ArrayB
>> uffer.scala:48)
>>
>> at scala.collection.mutable.ArrayBuffer.forEach(ArrayBuffer.scala:48)
>>
>> at org.apache.flink.runtime.jobmanager.JobManager.org$apache$
>> flink$runtime$jobmanager$JobManager$$updateAccumulators
>> (JobManager.scala:1788)
>>
>> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$
>> handleMessage$1.applyOrElse(JobManager.scala:967)
>>
>> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF
>> unction.scala:36)
>>
>> at org.apache.flink.runtime.LeaderSessionMassageFilter$$anonfun
>> $receive$1.applyOrEslse(LeaderSessionMessageFilter.scala:44)
>>
>> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF
>> unction.scala:36)
>>
>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag
>> es.scala:33)
>>
>> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag
>> es.scala:28)
>>
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>
>> at org.apache.flink.runtime.LogMesages$$anon$1.applyOrElse(
>> LogMessages.scala:28)
>>
>>
>> On January 19, 2017 at 11:58 AM Ted Yu <yuzhih...@gmail.com> wrote:
>>
>> Can you pastebin the complete stack trace for the NPE ?
>>
>> Thanks
>>
>> On Thu, Jan 19, 2017 at 8:57 AM, Dave Marion <dlmar...@comcast.net>
>> wrote:
>>
>> I'm running flink-1.1.4-bin-hadoop27-scala_2.11 and I'm running into an
>> issue where after some period of time (measured in 1 - 3 hours) the
>> JobManager gets an NPE and shuts itself down. The failure is at
>> JobManager$$updateAccumulators$1.apply(JobManager.scala:1790). I'm using
>> a custom accumulator[1], but can't tell from the JobManager code whether
>> the issue is in my Accumulator, or is a bug in the JobManager.
>>
>>
>> [1] https://github.com/NationalSecurityAgency/timely/blob/master
>> /analytics/src/main/java/timely/analytics/flink/SortedS
>> tringAccumulator.java
>>
>>
>>
>
>
>

Reply via email to