My intention is to make it compatible! Filed this bug - https://issues.apache.org/jira/browse/SPARK-11932 Looking into it right now. Thanks for testing it out and reporting this!
On Mon, Nov 23, 2015 at 7:22 AM, jan <j...@insidin.com> wrote: > Hi guys, > > I'm trying out the new trackStateByKey API of the Spark-1.6.0-preview2 > release and I'm encountering an exception when trying to restore previously > checkpointed state in spark streaming. > > Use case: > - execute a stateful Spark streaming job using trackStateByKey > - interrupt / kill the job > - start the job again (without any code changes or cleaning out the > checkpoint directory) > > Upon this restart, I encounter the exception below. The nature of the > exception makes me think either I am doing something wrong, or there's a > problem with this use case for the new trackStateByKey API. > > I uploaded my job code ( > https://gist.github.com/juyttenh/be7973b0c5c2eddd8a81), but it's > basically just a modified version of the spark streaming example > StatefulNetworkWordCount (that had already been updated to use > trackStateByKey). My version however includes the usage of > StreamingContext.getOrCreate to actually read the checkpointed state when > the job is started, leading to the exception below. > > Just to make sure: using StreamingContext.getOrCreate should still be > compatible with using the trackStateByKey API? > > Thanx, > Jan > > 15/11/23 10:55:07 ERROR StreamingContext: Error starting the context, > marking it as stopped > > java.lang.IllegalArgumentException: requirement failed > > at scala.Predef$.require(Predef.scala:221) > > at > org.apache.spark.streaming.rdd.TrackStateRDD.<init>(TrackStateRDD.scala:133) > > at > org.apache.spark.streaming.dstream.InternalTrackStateDStream$$anonfun$compute$2.apply(TrackStateDStream.scala:148) > > at > org.apache.spark.streaming.dstream.InternalTrackStateDStream$$anonfun$compute$2.apply(TrackStateDStream.scala:143) > > at scala.Option.map(Option.scala:145) > > at > org.apache.spark.streaming.dstream.InternalTrackStateDStream.compute(TrackStateDStream.scala:143) > > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:424) > > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) > > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) > > at scala.Option.orElse(Option.scala:257) > > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) > > at > org.apache.spark.streaming.dstream.TrackStateDStreamImpl.compute(TrackStateDStream.scala:66) > > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:424) > > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) > > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) > > at scala.Option.orElse(Option.scala:257) > > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) > > at > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47) > > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) > > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114) > > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > > at > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114) > > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:231) > > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:226) > > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > > at > org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:226) > > at > org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:96) > > at > org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:83) > > at > org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:609) > > at > org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:605) > > at > org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:605) > > at ... run in separate thread using org.apache.spark.util.ThreadUtils ... > () > > at > org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:605) > > at > org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:599) > > at > org.apache.spark.examples.streaming.StatefulNetworkWordCount$.main(StatefulNetworkWordCount.scala:48) > > at > org.apache.spark.examples.streaming.StatefulNetworkWordCount.main(StatefulNetworkWordCount.scala) > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:483) > > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:727) > > at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) > > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) > > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) > > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > > > ------------------------------ > View this message in context: Spark-1.6.0-preview2 trackStateByKey > exception restoring state > <http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-1-6-0-preview2-trackStateByKey-exception-restoring-state-tp15318.html> > Sent from the Apache Spark Developers List mailing list archive > <http://apache-spark-developers-list.1001551.n3.nabble.com/> at > Nabble.com. >