My intention is to make it compatible! Filed this bug -
https://issues.apache.org/jira/browse/SPARK-11932
Looking into it right now. Thanks for testing it out and reporting this!


On Mon, Nov 23, 2015 at 7:22 AM, jan <j...@insidin.com> wrote:

> Hi guys,
>
> I'm trying out the new trackStateByKey API of the Spark-1.6.0-preview2
> release and I'm encountering an exception when trying to restore previously
> checkpointed state in spark streaming.
>
> Use case:
> - execute a stateful Spark streaming job using trackStateByKey
> - interrupt / kill the job
> - start the job again (without any code changes or cleaning out the
> checkpoint directory)
>
> Upon this restart, I encounter the exception below. The nature of the
> exception makes me think either I am doing something wrong, or there's a
> problem with this use case for the new trackStateByKey API.
>
> I uploaded my job code (
> https://gist.github.com/juyttenh/be7973b0c5c2eddd8a81), but it's
> basically just a modified version of the spark streaming example
> StatefulNetworkWordCount (that had already been updated to use
> trackStateByKey). My version however includes the usage of
> StreamingContext.getOrCreate to actually read the checkpointed state when
> the job is started, leading to the exception below.
>
> Just to make sure: using StreamingContext.getOrCreate should still be
> compatible with using the trackStateByKey API?
>
> Thanx,
> Jan
>
> 15/11/23 10:55:07 ERROR StreamingContext: Error starting the context,
> marking it as stopped
>
> java.lang.IllegalArgumentException: requirement failed
>
> at scala.Predef$.require(Predef.scala:221)
>
> at
> org.apache.spark.streaming.rdd.TrackStateRDD.<init>(TrackStateRDD.scala:133)
>
> at
> org.apache.spark.streaming.dstream.InternalTrackStateDStream$$anonfun$compute$2.apply(TrackStateDStream.scala:148)
>
> at
> org.apache.spark.streaming.dstream.InternalTrackStateDStream$$anonfun$compute$2.apply(TrackStateDStream.scala:143)
>
> at scala.Option.map(Option.scala:145)
>
> at
> org.apache.spark.streaming.dstream.InternalTrackStateDStream.compute(TrackStateDStream.scala:143)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:424)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>
> at scala.Option.orElse(Option.scala:257)
>
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>
> at
> org.apache.spark.streaming.dstream.TrackStateDStreamImpl.compute(TrackStateDStream.scala:66)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350)
>
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349)
>
> at
> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:424)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
>
> at
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>
> at scala.Option.orElse(Option.scala:257)
>
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>
> at
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
>
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
>
> at
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)
>
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>
> at
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>
> at
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
>
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:231)
>
> at
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:226)
>
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>
> at
> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:226)
>
> at
> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:96)
>
> at
> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:83)
>
> at
> org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:609)
>
> at
> org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:605)
>
> at
> org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:605)
>
> at ... run in separate thread using org.apache.spark.util.ThreadUtils ...
> ()
>
> at
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:605)
>
> at
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:599)
>
> at
> org.apache.spark.examples.streaming.StatefulNetworkWordCount$.main(StatefulNetworkWordCount.scala:48)
>
> at
> org.apache.spark.examples.streaming.StatefulNetworkWordCount.main(StatefulNetworkWordCount.scala)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:483)
>
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:727)
>
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
>
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
>
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
>
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
> ------------------------------
> View this message in context: Spark-1.6.0-preview2 trackStateByKey
> exception restoring state
> <http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-1-6-0-preview2-trackStateByKey-exception-restoring-state-tp15318.html>
> Sent from the Apache Spark Developers List mailing list archive
> <http://apache-spark-developers-list.1001551.n3.nabble.com/> at
> Nabble.com.
>

Reply via email to