Re: updateStateByKey not persisting in Spark 1.5.1
I searched for checkpoint related methods in various Listener classes but haven't found any. Analyzing DAG is tedious and fragile since DAG may change in future Spark releases. Cheers On Thu, Jan 21, 2016 at 8:25 AM, Brian London wrote: > Thanks. It looks like extending my batch duration to 7 seconds is a > work-around. I'd like to build a check for the lack of checkpointing in > our integration tests. Is there a way to parse the DAG at runtime? > > On Wed, Jan 20, 2016 at 2:01 PM Ted Yu wrote: > >> This is related: >> >> SPARK-6847 >> >> FYI >> >> On Wed, Jan 20, 2016 at 7:55 AM, Brian London >> wrote: >> >>> I'm running a streaming job that has two calls to updateStateByKey. >>> When run in standalone mode both calls to updateStateByKey behave as >>> expected. When run on a cluster, however, it appears that the first call >>> is not being checkpointed as shown in this DAG image: >>> >>> http://i.imgur.com/zmQ8O2z.png >>> >>> The middle column continues to grow one level deeper every batch until I >>> get a stack overflow error. I'm guessing its a problem of the stateRDD not >>> being persisted, but I can't imagine why they wouldn't be. I thought >>> updateStateByKey was supposed to just handle that for you internally. >>> >>> Any ideas? >>> >>> I'll post stack trace excperpts of the stack overflow if anyone is >>> interested below: >>> >>> Job aborted due to stage failure: Task 7 in stage 195811.0 failed 4 >>> times, most recent failure: Lost task 7.3 in stage 195811.0 (TID 213529, >>> ip-10-168-177-216.ec2.internal): java.lang.StackOverflowError at >>> java.lang.Exception.(Exception.java:102) at >>> java.lang.ReflectiveOperationException.(ReflectiveOperationException.java:89) >>> at >>> java.lang.reflect.InvocationTargetException.(InvocationTargetException.java:72) >>> at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) at >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> at java.lang.reflect.Method.invoke(Method.java:606) at >>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at >>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897) at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at >>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) at >>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) at >>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >>> ... >>> >>> And >>> >>> scala.collection.immutable.$colon$colon in readObject at line 362 >>> scala.collection.immutable.$colon$colon in readObject at line 366 >>> scala.collection.immutable.$colon$colon in readObject at line 362 >>> scala.collection.immutable.$colon$colon in readObject at line 362 >>> scala.collection.immutable.$colon$colon in readObject at line 366 >>> scala.collection.immutable.$colon$colon in readObject at line 362 >>> scala.collection.immutable.$colon$colon in readObject at line 362 >>> ... >>> >>>
Re: updateStateByKey not persisting in Spark 1.5.1
Thanks. It looks like extending my batch duration to 7 seconds is a work-around. I'd like to build a check for the lack of checkpointing in our integration tests. Is there a way to parse the DAG at runtime? On Wed, Jan 20, 2016 at 2:01 PM Ted Yu wrote: > This is related: > > SPARK-6847 > > FYI > > On Wed, Jan 20, 2016 at 7:55 AM, Brian London > wrote: > >> I'm running a streaming job that has two calls to updateStateByKey. When >> run in standalone mode both calls to updateStateByKey behave as expected. >> When run on a cluster, however, it appears that the first call is not being >> checkpointed as shown in this DAG image: >> >> http://i.imgur.com/zmQ8O2z.png >> >> The middle column continues to grow one level deeper every batch until I >> get a stack overflow error. I'm guessing its a problem of the stateRDD not >> being persisted, but I can't imagine why they wouldn't be. I thought >> updateStateByKey was supposed to just handle that for you internally. >> >> Any ideas? >> >> I'll post stack trace excperpts of the stack overflow if anyone is >> interested below: >> >> Job aborted due to stage failure: Task 7 in stage 195811.0 failed 4 >> times, most recent failure: Lost task 7.3 in stage 195811.0 (TID 213529, >> ip-10-168-177-216.ec2.internal): java.lang.StackOverflowError at >> java.lang.Exception.(Exception.java:102) at >> java.lang.ReflectiveOperationException.(ReflectiveOperationException.java:89) >> at >> java.lang.reflect.InvocationTargetException.(InvocationTargetException.java:72) >> at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:606) at >> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897) at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at >> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) at >> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) at >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) >> ... >> >> And >> >> scala.collection.immutable.$colon$colon in readObject at line 362 >> scala.collection.immutable.$colon$colon in readObject at line 366 >> scala.collection.immutable.$colon$colon in readObject at line 362 >> scala.collection.immutable.$colon$colon in readObject at line 362 >> scala.collection.immutable.$colon$colon in readObject at line 366 >> scala.collection.immutable.$colon$colon in readObject at line 362 >> scala.collection.immutable.$colon$colon in readObject at line 362 >> ... >> >>
Re: updateStateByKey not persisting in Spark 1.5.1
This is related: SPARK-6847 FYI On Wed, Jan 20, 2016 at 7:55 AM, Brian London wrote: > I'm running a streaming job that has two calls to updateStateByKey. When > run in standalone mode both calls to updateStateByKey behave as expected. > When run on a cluster, however, it appears that the first call is not being > checkpointed as shown in this DAG image: > > http://i.imgur.com/zmQ8O2z.png > > The middle column continues to grow one level deeper every batch until I > get a stack overflow error. I'm guessing its a problem of the stateRDD not > being persisted, but I can't imagine why they wouldn't be. I thought > updateStateByKey was supposed to just handle that for you internally. > > Any ideas? > > I'll post stack trace excperpts of the stack overflow if anyone is > interested below: > > Job aborted due to stage failure: Task 7 in stage 195811.0 failed 4 times, > most recent failure: Lost task 7.3 in stage 195811.0 (TID 213529, > ip-10-168-177-216.ec2.internal): java.lang.StackOverflowError at > java.lang.Exception.(Exception.java:102) at > java.lang.ReflectiveOperationException.(ReflectiveOperationException.java:89) > at > java.lang.reflect.InvocationTargetException.(InvocationTargetException.java:72) > at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897) at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > ... > > And > > scala.collection.immutable.$colon$colon in readObject at line 362 > scala.collection.immutable.$colon$colon in readObject at line 366 > scala.collection.immutable.$colon$colon in readObject at line 362 > scala.collection.immutable.$colon$colon in readObject at line 362 > scala.collection.immutable.$colon$colon in readObject at line 366 > scala.collection.immutable.$colon$colon in readObject at line 362 > scala.collection.immutable.$colon$colon in readObject at line 362 > ... > >
Re: updateStateByKey not persisting in Spark 1.5.1
Could you share your log? On Wed, Jan 20, 2016 at 7:55 AM, Brian London wrote: > I'm running a streaming job that has two calls to updateStateByKey. When > run in standalone mode both calls to updateStateByKey behave as expected. > When run on a cluster, however, it appears that the first call is not being > checkpointed as shown in this DAG image: > > http://i.imgur.com/zmQ8O2z.png > > The middle column continues to grow one level deeper every batch until I > get a stack overflow error. I'm guessing its a problem of the stateRDD not > being persisted, but I can't imagine why they wouldn't be. I thought > updateStateByKey was supposed to just handle that for you internally. > > Any ideas? > > I'll post stack trace excperpts of the stack overflow if anyone is > interested below: > > Job aborted due to stage failure: Task 7 in stage 195811.0 failed 4 times, > most recent failure: Lost task 7.3 in stage 195811.0 (TID 213529, > ip-10-168-177-216.ec2.internal): java.lang.StackOverflowError at > java.lang.Exception.(Exception.java:102) at > java.lang.ReflectiveOperationException.(ReflectiveOperationException.java:89) > at > java.lang.reflect.InvocationTargetException.(InvocationTargetException.java:72) > at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) at > java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897) at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) > ... > > And > > scala.collection.immutable.$colon$colon in readObject at line 362 > scala.collection.immutable.$colon$colon in readObject at line 366 > scala.collection.immutable.$colon$colon in readObject at line 362 > scala.collection.immutable.$colon$colon in readObject at line 362 > scala.collection.immutable.$colon$colon in readObject at line 366 > scala.collection.immutable.$colon$colon in readObject at line 362 > scala.collection.immutable.$colon$colon in readObject at line 362 > ... > >
updateStateByKey not persisting in Spark 1.5.1
I'm running a streaming job that has two calls to updateStateByKey. When run in standalone mode both calls to updateStateByKey behave as expected. When run on a cluster, however, it appears that the first call is not being checkpointed as shown in this DAG image: http://i.imgur.com/zmQ8O2z.png The middle column continues to grow one level deeper every batch until I get a stack overflow error. I'm guessing its a problem of the stateRDD not being persisted, but I can't imagine why they wouldn't be. I thought updateStateByKey was supposed to just handle that for you internally. Any ideas? I'll post stack trace excperpts of the stack overflow if anyone is interested below: Job aborted due to stage failure: Task 7 in stage 195811.0 failed 4 times, most recent failure: Lost task 7.3 in stage 195811.0 (TID 213529, ip-10-168-177-216.ec2.internal): java.lang.StackOverflowError at java.lang.Exception.(Exception.java:102) at java.lang.ReflectiveOperationException.(ReflectiveOperationException.java:89) at java.lang.reflect.InvocationTargetException.(InvocationTargetException.java:72) at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) ... And scala.collection.immutable.$colon$colon in readObject at line 362 scala.collection.immutable.$colon$colon in readObject at line 366 scala.collection.immutable.$colon$colon in readObject at line 362 scala.collection.immutable.$colon$colon in readObject at line 362 scala.collection.immutable.$colon$colon in readObject at line 366 scala.collection.immutable.$colon$colon in readObject at line 362 scala.collection.immutable.$colon$colon in readObject at line 362 ...