[ https://issues.apache.org/jira/browse/SPARK-11932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Michael Armbrust updated SPARK-11932: ------------------------------------- Priority: Critical (was: Blocker) > trackStateByKey throws java.lang.IllegalArgumentException: requirement failed > on restarting from checkpoint > ----------------------------------------------------------------------------------------------------------- > > Key: SPARK-11932 > URL: https://issues.apache.org/jira/browse/SPARK-11932 > Project: Spark > Issue Type: Bug > Components: Streaming > Reporter: Tathagata Das > Assignee: Tathagata Das > Priority: Critical > > The problem is that when recovering a streaming application using > trackStateByKey from Dstream checkpoints, there is the following exception. > Code > {code} > StreamingContext.getOrCreate(".", () => createContext(args)) > ... > def createContext(args: Array[String]) : StreamingContext = { > val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount") > // Create the context with a 1 second batch size > val ssc = new StreamingContext(sparkConf, Seconds(1)) > > ssc.checkpoint(".") > // Initial RDD input to trackStateByKey > val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), > ("world", 1))) > // Create a ReceiverInputDStream on target ip:port and count the > // words in input stream of \n delimited test (eg. generated by 'nc') > val lines = ssc.socketTextStream(args(0), args(1).toInt) > val words = lines.flatMap(_.split(" ")) > val wordDstream = words.map(x => (x, 1)) > // Update the cumulative count using updateStateByKey > // This will give a DStream made of state (which is the cumulative count > of the words) > val trackStateFunc = (batchTime: Time, word: String, one: Option[Int], > state: State[Int]) => { > val sum = one.getOrElse(0) + state.getOption.getOrElse(0) > val output = (word, sum) > state.update(sum) > Some(output) > } > val stateDstream = wordDstream.trackStateByKey( > StateSpec.function(trackStateFunc).initialState(initialRDD)) > stateDstream.print() > > ssc > > } > {code} > Error > {code} > 15/11/23 10:55:07 ERROR StreamingContext: Error starting the context, marking > it as stopped > java.lang.IllegalArgumentException: requirement failed > at scala.Predef$.require(Predef.scala:221) > at > org.apache.spark.streaming.rdd.TrackStateRDD.<init>(TrackStateRDD.scala:133) > at > org.apache.spark.streaming.dstream.InternalTrackStateDStream$$anonfun$compute$2.apply(TrackStateDStream.scala:148) > at > org.apache.spark.streaming.dstream.InternalTrackStateDStream$$anonfun$compute$2.apply(TrackStateDStream.scala:143) > at scala.Option.map(Option.scala:145) > at > org.apache.spark.streaming.dstream.InternalTrackStateDStream.compute(TrackStateDStream.scala:143) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:424) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) > at scala.Option.orElse(Option.scala:257) > at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) > at > org.apache.spark.streaming.dstream.TrackStateDStreamImpl.compute(TrackStateDStream.scala:66) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349 > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:424) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) > at scala.Option.orElse(Option.scala:257) > at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) > at > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scla:47) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:231) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:226) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > at > org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:226 > at > org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:96) > at > org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:83) > at > org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:609) > at > org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:605) > at > org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:605) > at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () > at > org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:605) > at > org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:599) > at > org.apache.spark.examples.streaming.StatefulNetworkWordCount$.main(StatefulNetworkWordCount.scala:48) > at > org.apache.spark.examples.streaming.StatefulNetworkWordCount.main(StatefulNetworkWordCount.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:483) > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:727) > at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > {code} > The reason is that TrackStateRDDs generated by trackStateByKey expect the > previous batch's TrackStateRDDs to have a partitioner. However, when recovery > from DStream checkpoints, the RDDs recovered from RDD checkpoints do not have > a partitioner attached to it. This is because RDD checkpoints do not preserve > the partitioner (SPARK-12004). -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org