Thanks. It looks like extending my batch duration to 7 seconds is a
work-around.  I'd like to build a check for the lack of checkpointing in
our integration tests.  Is there a way to parse the DAG at runtime?

On Wed, Jan 20, 2016 at 2:01 PM Ted Yu <yuzhih...@gmail.com> wrote:

> This is related:
>
> SPARK-6847
>
> FYI
>
> On Wed, Jan 20, 2016 at 7:55 AM, Brian London <brianmlon...@gmail.com>
> wrote:
>
>> I'm running a streaming job that has two calls to updateStateByKey.  When
>> run in standalone mode both calls to updateStateByKey behave as expected.
>> When run on a cluster, however, it appears that the first call is not being
>> checkpointed as shown in this DAG image:
>>
>> http://i.imgur.com/zmQ8O2z.png
>>
>> The middle column continues to grow one level deeper every batch until I
>> get a stack overflow error.  I'm guessing its a problem of the stateRDD not
>> being persisted, but I can't imagine why they wouldn't be.  I thought
>> updateStateByKey was supposed to just handle that for you internally.
>>
>> Any ideas?
>>
>> I'll post stack trace excperpts of the stack overflow if anyone is
>> interested below:
>>
>> Job aborted due to stage failure: Task 7 in stage 195811.0 failed 4
>> times, most recent failure: Lost task 7.3 in stage 195811.0 (TID 213529,
>> ip-10-168-177-216.ec2.internal): java.lang.StackOverflowError at
>> java.lang.Exception.<init>(Exception.java:102) at
>> java.lang.ReflectiveOperationException.<init>(ReflectiveOperationException.java:89)
>> at
>> java.lang.reflect.InvocationTargetException.<init>(InvocationTargetException.java:72)
>> at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source) at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606) at
>> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897) at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997) at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921) at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>> ...
>>
>> And
>>
>> scala.collection.immutable.$colon$colon in readObject at line 362
>> scala.collection.immutable.$colon$colon in readObject at line 366
>> scala.collection.immutable.$colon$colon in readObject at line 362
>> scala.collection.immutable.$colon$colon in readObject at line 362
>> scala.collection.immutable.$colon$colon in readObject at line 366
>> scala.collection.immutable.$colon$colon in readObject at line 362
>> scala.collection.immutable.$colon$colon in readObject at line 362
>> ...
>>
>>

Reply via email to