Re: Re: NPE in JobManager
Fixing my accumulator did the trick. I should note that the JobManager did not fail when I ran this previously against Flink 1.1.3. Thanks for the help! Dave > On January 20, 2017 at 8:45 AM Dave Marion wrote: > > I do see that message in one of the task manager logs 20ms before the NPE > in the JobManager. Looking in that log, there is a > ConcurrentModificationException in TreeMap, which my accumulator uses. I'll > track this down, thanks for the pointer. > > > > > On January 20, 2017 at 8:27 AM Stephan Ewen > wrote: > > > > Hi! > > > > My current assumption is that there is an accumulator that cannot > > be serialized. The SortedStringAccumulator looks fine at a first glance, > > but are there other accumulators involved? > > Do you see a message like that one in the log of one of the > > TaskManagers > > > > "Failed to serialize accumulators for task." > > > > with an exception stack trace? > > > > > > Stephan > > > > > > > > On Fri, Jan 20, 2017 at 2:10 PM, Dave Marion > mailto:dlmar...@comcast.net > wrote: > > > > > > > Stephan, > > > > > > Thanks for looking at this. Could you elaborate on the > > > misbehavior in the accumulator? I'd like to fix it if it's incorrect. > > > > > > Dave > > > > > > > > > > > > > > > > On January 20, 2017 at 4:29 AM Stephan Ewen > > > mailto:se...@apache.org > wrote: > > > > > > > > Hi! > > > > > > > > It seems that the accumulator behaves in a non-standard > > > > way, but the JobManager should also catch that (log a warning or debug > > > > message) and simply continue (not crash). > > > > > > > > I'll try to add a patch that the JobManager tolerates > > > > these kinds of issues in the accumulators. > > > > > > > > Stephan > > > > > > > > > > > > On Thu, Jan 19, 2017 at 7:26 PM, Dave Marion > > > > mailto:dlmar...@comcast.net > wrote: > > > > > > > > > > > > > > > > > > Noticed I didn't cc the user list. > > > > > > > > > > > > > > Original Message -- > > > > From: Dave Marion > > > mailto:dlmar...@comcast.net > > > > > To: Ted Yu > > > mailto:yuzhih...@gmail.com > > > > > Date: January 19, 2017 at 12:13 PM > > > > Subject: Re: NPE in JobManager > > > > > > > > > > > > That might take some time. Here is a hand typed > > > > top N lines. If that is not enough let me know and I will start the > > > > process of getting the full stack trace. > > > > > > > > > > > > NullPointerException > > > > > > > > at > > > > JobManager$$updateAccumulators$1.apply(JobManager.scala:1790) > > > > > > > > at > > > > JobManager$$updateAccumulators$1.apply(JobManager.scala:1788) > > > > > > > > at > > > > scala.collection.mutable.ResizableArray$class.forEach(ArrayBuffer.scala:48) > > > > > > > > at > > > > scala.collection.mutable.ArrayBuffer.forEach(ArrayBuffer.scala:48) > > > > > > > > at > > > > org.apache.flink.runtime.jobmanager.JobManager.org > > > > http://org.apache.flink.runtime.jobmanager.JobManager.org > > > > $apache$flink$runtime$jobmanager$JobManager$$updateAccumulators(JobManager.scala:1788) > > > > > > > > at > > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:967) > > > > > > > > at > > > > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > > > > > >
Re: Re: NPE in JobManager
I do see that message in one of the task manager logs 20ms before the NPE in the JobManager. Looking in that log, there is a ConcurrentModificationException in TreeMap, which my accumulator uses. I'll track this down, thanks for the pointer. > On January 20, 2017 at 8:27 AM Stephan Ewen wrote: > > Hi! > > My current assumption is that there is an accumulator that cannot be > serialized. The SortedStringAccumulator looks fine at a first glance, but are > there other accumulators involved? > Do you see a message like that one in the log of one of the TaskManagers > > "Failed to serialize accumulators for task." > > with an exception stack trace? > > > Stephan > > > > On Fri, Jan 20, 2017 at 2:10 PM, Dave Marion mailto:dlmar...@comcast.net > wrote: > > > > Stephan, > > > > Thanks for looking at this. Could you elaborate on the misbehavior > > in the accumulator? I'd like to fix it if it's incorrect. > > > > Dave > > > > > > > > > > > On January 20, 2017 at 4:29 AM Stephan Ewen > > mailto:se...@apache.org > wrote: > > > > > > Hi! > > > > > > It seems that the accumulator behaves in a non-standard way, > > > but the JobManager should also catch that (log a warning or debug > > > message) and simply continue (not crash). > > > > > > I'll try to add a patch that the JobManager tolerates these > > > kinds of issues in the accumulators. > > > > > > Stephan > > > > > > > > > On Thu, Jan 19, 2017 at 7:26 PM, Dave Marion > > > mailto:dlmar...@comcast.net > wrote: > > > > > > > > > > > > > > Noticed I didn't cc the user list. > > > > > > > > > > > Original Message -- > > > From: Dave Marion > > mailto:dlmar...@comcast.net > > > > To: Ted Yu > > mailto:yuzhih...@gmail.com > > > > Date: January 19, 2017 at 12:13 PM > > > Subject: Re: NPE in JobManager > > > > > > > > > That might take some time. Here is a hand typed top N > > > lines. If that is not enough let me know and I will start the process of > > > getting the full stack trace. > > > > > > > > > NullPointerException > > > > > > at > > > JobManager$$updateAccumulators$1.apply(JobManager.scala:1790) > > > > > > at > > > JobManager$$updateAccumulators$1.apply(JobManager.scala:1788) > > > > > > at > > > scala.collection.mutable.ResizableArray$class.forEach(ArrayBuffer.scala:48) > > > > > > at > > > scala.collection.mutable.ArrayBuffer.forEach(ArrayBuffer.scala:48) > > > > > > at org.apache.flink.runtime.jobmanager.JobManager.org > > > http://org.apache.flink.runtime.jobmanager.JobManager.org > > > $apache$flink$runtime$jobmanager$JobManager$$updateAccumulators(JobManager.scala:1788) > > > > > > at > > > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:967) > > > > > > at > > > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > > > > > > at > > > org.apache.flink.runtime.LeaderSessionMassageFilter$$anonfun$receive$1.applyOrEslse(LeaderSessionMessageFilter.scala:44) > > > > > > at > > > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > > > > > > at > > > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > > > > > > at > > > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > > > > > > at > > > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > > > > > > at > > > org.apache.flink.runtime.LogMesages$$anon$1.applyOrElse(LogMessages.scala:28) > > > > > > > > >
Re: Re: NPE in JobManager
Hi! My current assumption is that there is an accumulator that cannot be serialized. The SortedStringAccumulator looks fine at a first glance, but are there other accumulators involved? Do you see a message like that one in the log of one of the TaskManagers "Failed to serialize accumulators for task." with an exception stack trace? Stephan On Fri, Jan 20, 2017 at 2:10 PM, Dave Marion wrote: > Stephan, > > Thanks for looking at this. Could you elaborate on the misbehavior in the > accumulator? I'd like to fix it if it's incorrect. > > Dave > > > On January 20, 2017 at 4:29 AM Stephan Ewen wrote: > > Hi! > > It seems that the accumulator behaves in a non-standard way, but the > JobManager should also catch that (log a warning or debug message) and > simply continue (not crash). > > I'll try to add a patch that the JobManager tolerates these kinds of > issues in the accumulators. > > Stephan > > > On Thu, Jan 19, 2017 at 7:26 PM, Dave Marion wrote: > >> Noticed I didn't cc the user list. >> >> ------ Original Message -- >> From: Dave Marion >> To: Ted Yu >> Date: January 19, 2017 at 12:13 PM >> Subject: Re: NPE in JobManager >> >> That might take some time. Here is a hand typed top N lines. If that is >> not enough let me know and I will start the process of getting the full >> stack trace. >> >> >> NullPointerException >> >> at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790) >> >> at JobManager$$updateAccumulators$1.apply(JobManager.scala:1788) >> >> at scala.collection.mutable.ResizableArray$class.forEach(ArrayB >> uffer.scala:48) >> >> at scala.collection.mutable.ArrayBuffer.forEach(ArrayBuffer.scala:48) >> >> at org.apache.flink.runtime.jobmanager.JobManager.org$apache$ >> flink$runtime$jobmanager$JobManager$$updateAccumulators >> (JobManager.scala:1788) >> >> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$ >> handleMessage$1.applyOrElse(JobManager.scala:967) >> >> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF >> unction.scala:36) >> >> at org.apache.flink.runtime.LeaderSessionMassageFilter$$anonfun >> $receive$1.applyOrEslse(LeaderSessionMessageFilter.scala:44) >> >> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF >> unction.scala:36) >> >> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag >> es.scala:33) >> >> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag >> es.scala:28) >> >> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >> >> at org.apache.flink.runtime.LogMesages$$anon$1.applyOrElse( >> LogMessages.scala:28) >> >> >> On January 19, 2017 at 11:58 AM Ted Yu wrote: >> >> Can you pastebin the complete stack trace for the NPE ? >> >> Thanks >> >> On Thu, Jan 19, 2017 at 8:57 AM, Dave Marion >> wrote: >> >> I'm running flink-1.1.4-bin-hadoop27-scala_2.11 and I'm running into an >> issue where after some period of time (measured in 1 - 3 hours) the >> JobManager gets an NPE and shuts itself down. The failure is at >> JobManager$$updateAccumulators$1.apply(JobManager.scala:1790). I'm using >> a custom accumulator[1], but can't tell from the JobManager code whether >> the issue is in my Accumulator, or is a bug in the JobManager. >> >> >> [1] https://github.com/NationalSecurityAgency/timely/blob/master >> /analytics/src/main/java/timely/analytics/flink/SortedS >> tringAccumulator.java >> >> >> > > >
Re: Re: NPE in JobManager
Stephan, Thanks for looking at this. Could you elaborate on the misbehavior in the accumulator? I'd like to fix it if it's incorrect. Dave > On January 20, 2017 at 4:29 AM Stephan Ewen wrote: > > Hi! > > It seems that the accumulator behaves in a non-standard way, but the > JobManager should also catch that (log a warning or debug message) and simply > continue (not crash). > > I'll try to add a patch that the JobManager tolerates these kinds of > issues in the accumulators. > > Stephan > > > On Thu, Jan 19, 2017 at 7:26 PM, Dave Marion mailto:dlmar...@comcast.net > wrote: > > > > > > Noticed I didn't cc the user list. > > > > > Original Message -- > From: Dave Marion mailto:dlmar...@comcast.net > > To: Ted Yu mailto:yuzhih...@gmail.com > > Date: January 19, 2017 at 12:13 PM > Subject: Re: NPE in JobManager > > > That might take some time. Here is a hand typed top N lines. If > that is not enough let me know and I will start the process of getting the > full stack trace. > > > NullPointerException > > at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790) > > at JobManager$$updateAccumulators$1.apply(JobManager.scala:1788) > > at > scala.collection.mutable.ResizableArray$class.forEach(ArrayBuffer.scala:48) > > at > scala.collection.mutable.ArrayBuffer.forEach(ArrayBuffer.scala:48) > > at org.apache.flink.runtime.jobmanager.JobManager.org > http://org.apache.flink.runtime.jobmanager.JobManager.org > $apache$flink$runtime$jobmanager$JobManager$$updateAccumulators(JobManager.scala:1788) > > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:967) > > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > > at > org.apache.flink.runtime.LeaderSessionMassageFilter$$anonfun$receive$1.applyOrEslse(LeaderSessionMessageFilter.scala:44) > > at > scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) > > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > > at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > > at > org.apache.flink.runtime.LogMesages$$anon$1.applyOrElse(LogMessages.scala:28) > > > > > On January 19, 2017 at 11:58 AM Ted Yu > mailto:yuzhih...@gmail.com > wrote: > > > > Can you pastebin the complete stack trace for the NPE ? > > > > Thanks > > > > On Thu, Jan 19, 2017 at 8:57 AM, Dave Marion > > mailto:dlmar...@comcast.net > wrote: > > > > > > > > > > I'm running flink-1.1.4-bin-hadoop27-scala_2.11 and > > > I'm running into an issue where after some period of time (measured in 1 > > > - 3 hours) the JobManager gets an NPE and shuts itself down. The failure > > > is at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790). I'm > > > using a custom accumulator[1], but can't tell from the JobManager code > > > whether the issue is in my Accumulator, or is a bug in the JobManager. > > > > > > > > > [1] > > > https://github.com/NationalSecurityAgency/timely/blob/master/analytics/src/main/java/timely/analytics/flink/SortedStringAccumulator.java > > > > > > https://github.com/NationalSecurityAgency/timely/blob/master/analytics/src/main/java/timely/analytics/flink/SortedStringAccumulator.java > > > > > > > > > > > > > >
Re: Re: NPE in JobManager
I opened this issue: https://issues.apache.org/jira/browse/FLINK-5585 Assuming the bug is what I think it is (cannot be 100% sure from just the small stack trace sample) it should be fixed soon... On Fri, Jan 20, 2017 at 10:29 AM, Stephan Ewen wrote: > Hi! > > It seems that the accumulator behaves in a non-standard way, but the > JobManager should also catch that (log a warning or debug message) and > simply continue (not crash). > > I'll try to add a patch that the JobManager tolerates these kinds of > issues in the accumulators. > > Stephan > > > On Thu, Jan 19, 2017 at 7:26 PM, Dave Marion wrote: > >> Noticed I didn't cc the user list. >> >> -- Original Message -- >> From: Dave Marion >> To: Ted Yu >> Date: January 19, 2017 at 12:13 PM >> Subject: Re: NPE in JobManager >> >> That might take some time. Here is a hand typed top N lines. If that is >> not enough let me know and I will start the process of getting the full >> stack trace. >> >> >> NullPointerException >> >> at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790) >> >> at JobManager$$updateAccumulators$1.apply(JobManager.scala:1788) >> >> at scala.collection.mutable.ResizableArray$class.forEach(ArrayB >> uffer.scala:48) >> >> at scala.collection.mutable.ArrayBuffer.forEach(ArrayBuffer.scala:48) >> >> at org.apache.flink.runtime.jobmanager.JobManager.org$apache$ >> flink$runtime$jobmanager$JobManager$$updateAccumulators >> (JobManager.scala:1788) >> >> at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$ >> handleMessage$1.applyOrElse(JobManager.scala:967) >> >> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF >> unction.scala:36) >> >> at org.apache.flink.runtime.LeaderSessionMassageFilter$$anonfun >> $receive$1.applyOrEslse(LeaderSessionMessageFilter.scala:44) >> >> at scala.runtime.AbstractPartialFunction.apply(AbstractPartialF >> unction.scala:36) >> >> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag >> es.scala:33) >> >> at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessag >> es.scala:28) >> >> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >> >> at org.apache.flink.runtime.LogMesages$$anon$1.applyOrElse( >> LogMessages.scala:28) >> >> >> On January 19, 2017 at 11:58 AM Ted Yu wrote: >> >> Can you pastebin the complete stack trace for the NPE ? >> >> Thanks >> >> On Thu, Jan 19, 2017 at 8:57 AM, Dave Marion >> wrote: >> >> I'm running flink-1.1.4-bin-hadoop27-scala_2.11 and I'm running into an >> issue where after some period of time (measured in 1 - 3 hours) the >> JobManager gets an NPE and shuts itself down. The failure is at >> JobManager$$updateAccumulators$1.apply(JobManager.scala:1790). I'm using >> a custom accumulator[1], but can't tell from the JobManager code whether >> the issue is in my Accumulator, or is a bug in the JobManager. >> >> >> [1] https://github.com/NationalSecurityAgency/timely/blob/master >> /analytics/src/main/java/timely/analytics/flink/SortedS >> tringAccumulator.java >> >> >> >
Re: Re: NPE in JobManager
Hi! It seems that the accumulator behaves in a non-standard way, but the JobManager should also catch that (log a warning or debug message) and simply continue (not crash). I'll try to add a patch that the JobManager tolerates these kinds of issues in the accumulators. Stephan On Thu, Jan 19, 2017 at 7:26 PM, Dave Marion wrote: > Noticed I didn't cc the user list. > > -- Original Message -- > From: Dave Marion > To: Ted Yu > Date: January 19, 2017 at 12:13 PM > Subject: Re: NPE in JobManager > > That might take some time. Here is a hand typed top N lines. If that is > not enough let me know and I will start the process of getting the full > stack trace. > > > NullPointerException > > at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790) > > at JobManager$$updateAccumulators$1.apply(JobManager.scala:1788) > > at scala.collection.mutable.ResizableArray$class.forEach( > ArrayBuffer.scala:48) > > at scala.collection.mutable.ArrayBuffer.forEach(ArrayBuffer.scala:48) > > at org.apache.flink.runtime.jobmanager.JobManager.org$ > apache$flink$runtime$jobmanager$JobManager$$updateAccumulators(JobManager. > scala:1788) > > at org.apache.flink.runtime.jobmanager.JobManager$$ > anonfun$handleMessage$1.applyOrElse(JobManager.scala:967) > > at scala.runtime.AbstractPartialFunction.apply( > AbstractPartialFunction.scala:36) > > at org.apache.flink.runtime.LeaderSessionMassageFilter$$anonfun$receive$1. > applyOrEslse(LeaderSessionMessageFilter.scala:44) > > at scala.runtime.AbstractPartialFunction.apply( > AbstractPartialFunction.scala:36) > > at org.apache.flink.runtime.LogMessages$$anon$1.apply( > LogMessages.scala:33) > > at org.apache.flink.runtime.LogMessages$$anon$1.apply( > LogMessages.scala:28) > > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > > at org.apache.flink.runtime.LogMesages$$anon$1. > applyOrElse(LogMessages.scala:28) > > > On January 19, 2017 at 11:58 AM Ted Yu wrote: > > Can you pastebin the complete stack trace for the NPE ? > > Thanks > > On Thu, Jan 19, 2017 at 8:57 AM, Dave Marion wrote: > > I'm running flink-1.1.4-bin-hadoop27-scala_2.11 and I'm running into an > issue where after some period of time (measured in 1 - 3 hours) the > JobManager gets an NPE and shuts itself down. The failure is at > JobManager$$updateAccumulators$1.apply(JobManager.scala:1790). I'm using > a custom accumulator[1], but can't tell from the JobManager code whether > the issue is in my Accumulator, or is a bug in the JobManager. > > > [1] https://github.com/NationalSecurityAgency/timely/blob/ > master/analytics/src/main/java/timely/analytics/flink/ > SortedStringAccumulator.java > > >
Fwd: Re: NPE in JobManager
Noticed I didn't cc the user list. Original Message -- From: Dave Marion To: Ted Yu Date: January 19, 2017 at 12:13 PM Subject: Re: NPE in JobManager That might take some time. Here is a hand typed top N lines. If that is not enough let me know and I will start the process of getting the full stack trace. NullPointerException at JobManager$$updateAccumulators$1.apply(JobManager.scala:1790) at JobManager$$updateAccumulators$1.apply(JobManager.scala:1788) at scala.collection.mutable.ResizableArray$class.forEach(ArrayBuffer.scala:48) at scala.collection.mutable.ArrayBuffer.forEach(ArrayBuffer.scala:48) at org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$updateAccumulators(JobManager.scala:1788) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:967) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LeaderSessionMassageFilter$$anonfun$receive$1.applyOrEslse(LeaderSessionMessageFilter.scala:44) at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) at org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at org.apache.flink.runtime.LogMesages$$anon$1.applyOrElse(LogMessages.scala:28) On January 19, 2017 at 11:58 AM Ted Yu wrote: Can you pastebin the complete stack trace for the NPE ? Thanks On Thu, Jan 19, 2017 at 8:57 AM, Dave Marion mailto:dlmar...@comcast.net > wrote: > > I'm running flink-1.1.4-bin-hadoop27-scala_2.11 and I'm running > into an issue where after some period of time (measured in 1 - 3 hours) the > JobManager gets an NPE and shuts itself down. The failure is at > JobManager$$updateAccumulators$1.apply(JobManager.scala:1790). I'm using a > custom accumulator[1], but can't tell from the JobManager code whether the > issue is in my Accumulator, or is a bug in the JobManager. > > > [1] > https://github.com/NationalSecurityAgency/timely/blob/master/analytics/src/main/java/timely/analytics/flink/SortedStringAccumulator.java > > https://github.com/NationalSecurityAgency/timely/blob/master/analytics/src/main/java/timely/analytics/flink/SortedStringAccumulator.java > >
Re: NPE in JobManager
Can you pastebin the complete stack trace for the NPE ? Thanks On Thu, Jan 19, 2017 at 8:57 AM, Dave Marion wrote: > I'm running flink-1.1.4-bin-hadoop27-scala_2.11 and I'm running into an > issue where after some period of time (measured in 1 - 3 hours) the > JobManager gets an NPE and shuts itself down. The failure is at JobManager$$ > updateAccumulators$1.apply(JobManager.scala:1790). I'm using a custom > accumulator[1], but can't tell from the JobManager code whether the issue > is in my Accumulator, or is a bug in the JobManager. > > > [1] https://github.com/NationalSecurityAgency/timely/ > blob/master/analytics/src/main/java/timely/analytics/ > flink/SortedStringAccumulator.java >