[ https://issues.apache.org/jira/browse/SPARK-11932?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tathagata Das updated SPARK-11932: ---------------------------------- Description: The problem is that when recovering a streaming application using trackStateByKey from Dstream checkpoints, there is the following exception. Code {code} StreamingContext.getOrCreate(".", () => createContext(args)) ... def createContext(args: Array[String]) : StreamingContext = { val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount") // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Seconds(1)) ssc.checkpoint(".") // Initial RDD input to trackStateByKey val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1))) // Create a ReceiverInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') val lines = ssc.socketTextStream(args(0), args(1).toInt) val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x => (x, 1)) // Update the cumulative count using updateStateByKey // This will give a DStream made of state (which is the cumulative count of the words) val trackStateFunc = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => { val sum = one.getOrElse(0) + state.getOption.getOrElse(0) val output = (word, sum) state.update(sum) Some(output) } val stateDstream = wordDstream.trackStateByKey( StateSpec.function(trackStateFunc).initialState(initialRDD)) stateDstream.print() ssc } {code} Error {code} 15/11/23 10:55:07 ERROR StreamingContext: Error starting the context, marking it as stopped java.lang.IllegalArgumentException: requirement failed at scala.Predef$.require(Predef.scala:221) at org.apache.spark.streaming.rdd.TrackStateRDD.<init>(TrackStateRDD.scala:133) at org.apache.spark.streaming.dstream.InternalTrackStateDStream$$anonfun$compute$2.apply(TrackStateDStream.scala:148) at org.apache.spark.streaming.dstream.InternalTrackStateDStream$$anonfun$compute$2.apply(TrackStateDStream.scala:143) at scala.Option.map(Option.scala:145) at org.apache.spark.streaming.dstream.InternalTrackStateDStream.compute(TrackStateDStream.scala:143) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:424) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) at org.apache.spark.streaming.dstream.TrackStateDStreamImpl.compute(TrackStateDStream.scala:66) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349 at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:424) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scla:47) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:231) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:226) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:226 at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:96) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:83) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:609) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:605) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:605) at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:605) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:599) at org.apache.spark.examples.streaming.StatefulNetworkWordCount$.main(StatefulNetworkWordCount.scala:48) at org.apache.spark.examples.streaming.StatefulNetworkWordCount.main(StatefulNetworkWordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:727) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) {code} The reason is that TrackStateRDDs generated by trackStateByKey expect the previous batch's TrackStateRDDs to have a partitioner. However, when recovery from DStream checkpoints, the RDDs recovered from RDD checkpoints do not have a partitioner attached to it. This is because RDD checkpoints do not preserve the partitioner (SPARK-12004). was: The problem is that Code {code} StreamingContext.getOrCreate(".", () => createContext(args)) ... def createContext(args: Array[String]) : StreamingContext = { val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount") // Create the context with a 1 second batch size val ssc = new StreamingContext(sparkConf, Seconds(1)) ssc.checkpoint(".") // Initial RDD input to trackStateByKey val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), ("world", 1))) // Create a ReceiverInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') val lines = ssc.socketTextStream(args(0), args(1).toInt) val words = lines.flatMap(_.split(" ")) val wordDstream = words.map(x => (x, 1)) // Update the cumulative count using updateStateByKey // This will give a DStream made of state (which is the cumulative count of the words) val trackStateFunc = (batchTime: Time, word: String, one: Option[Int], state: State[Int]) => { val sum = one.getOrElse(0) + state.getOption.getOrElse(0) val output = (word, sum) state.update(sum) Some(output) } val stateDstream = wordDstream.trackStateByKey( StateSpec.function(trackStateFunc).initialState(initialRDD)) stateDstream.print() ssc } {code} Error {code} 15/11/23 10:55:07 ERROR StreamingContext: Error starting the context, marking it as stopped java.lang.IllegalArgumentException: requirement failed at scala.Predef$.require(Predef.scala:221) at org.apache.spark.streaming.rdd.TrackStateRDD.<init>(TrackStateRDD.scala:133) at org.apache.spark.streaming.dstream.InternalTrackStateDStream$$anonfun$compute$2.apply(TrackStateDStream.scala:148) at org.apache.spark.streaming.dstream.InternalTrackStateDStream$$anonfun$compute$2.apply(TrackStateDStream.scala:143) at scala.Option.map(Option.scala:145) at org.apache.spark.streaming.dstream.InternalTrackStateDStream.compute(TrackStateDStream.scala:143) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:424) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) at org.apache.spark.streaming.dstream.TrackStateDStreamImpl.compute(TrackStateDStream.scala:66) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349 at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:424) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scla:47) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:231) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:226) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:226 at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:96) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:83) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:609) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:605) at org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:605) at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:605) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:599) at org.apache.spark.examples.streaming.StatefulNetworkWordCount$.main(StatefulNetworkWordCount.scala:48) at org.apache.spark.examples.streaming.StatefulNetworkWordCount.main(StatefulNetworkWordCount.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:727) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) {code} > trackStateByKey throws java.lang.IllegalArgumentException: requirement failed > on restarting from checkpoint > ----------------------------------------------------------------------------------------------------------- > > Key: SPARK-11932 > URL: https://issues.apache.org/jira/browse/SPARK-11932 > Project: Spark > Issue Type: Bug > Components: Streaming > Reporter: Tathagata Das > Assignee: Tathagata Das > Priority: Blocker > > The problem is that when recovering a streaming application using > trackStateByKey from Dstream checkpoints, there is the following exception. > Code > {code} > StreamingContext.getOrCreate(".", () => createContext(args)) > ... > def createContext(args: Array[String]) : StreamingContext = { > val sparkConf = new SparkConf().setAppName("StatefulNetworkWordCount") > // Create the context with a 1 second batch size > val ssc = new StreamingContext(sparkConf, Seconds(1)) > > ssc.checkpoint(".") > // Initial RDD input to trackStateByKey > val initialRDD = ssc.sparkContext.parallelize(List(("hello", 1), > ("world", 1))) > // Create a ReceiverInputDStream on target ip:port and count the > // words in input stream of \n delimited test (eg. generated by 'nc') > val lines = ssc.socketTextStream(args(0), args(1).toInt) > val words = lines.flatMap(_.split(" ")) > val wordDstream = words.map(x => (x, 1)) > // Update the cumulative count using updateStateByKey > // This will give a DStream made of state (which is the cumulative count > of the words) > val trackStateFunc = (batchTime: Time, word: String, one: Option[Int], > state: State[Int]) => { > val sum = one.getOrElse(0) + state.getOption.getOrElse(0) > val output = (word, sum) > state.update(sum) > Some(output) > } > val stateDstream = wordDstream.trackStateByKey( > StateSpec.function(trackStateFunc).initialState(initialRDD)) > stateDstream.print() > > ssc > > } > {code} > Error > {code} > 15/11/23 10:55:07 ERROR StreamingContext: Error starting the context, marking > it as stopped > java.lang.IllegalArgumentException: requirement failed > at scala.Predef$.require(Predef.scala:221) > at > org.apache.spark.streaming.rdd.TrackStateRDD.<init>(TrackStateRDD.scala:133) > at > org.apache.spark.streaming.dstream.InternalTrackStateDStream$$anonfun$compute$2.apply(TrackStateDStream.scala:148) > at > org.apache.spark.streaming.dstream.InternalTrackStateDStream$$anonfun$compute$2.apply(TrackStateDStream.scala:143) > at scala.Option.map(Option.scala:145) > at > org.apache.spark.streaming.dstream.InternalTrackStateDStream.compute(TrackStateDStream.scala:143) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:424) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) > at scala.Option.orElse(Option.scala:257) > at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) > at > org.apache.spark.streaming.dstream.TrackStateDStreamImpl.compute(TrackStateDStream.scala:66) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:350) > at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349 > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:349) > at > org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:424) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) > at scala.Option.orElse(Option.scala:257) > at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) > at > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scla:47) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:231) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:226) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > at > org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:226 > at > org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:96) > at > org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:83) > at > org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply$mcV$sp(StreamingContext.scala:609) > at > org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:605) > at > org.apache.spark.streaming.StreamingContext$$anonfun$liftedTree1$1$1.apply(StreamingContext.scala:605) > at ... run in separate thread using org.apache.spark.util.ThreadUtils ... () > at > org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:605) > at > org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:599) > at > org.apache.spark.examples.streaming.StatefulNetworkWordCount$.main(StatefulNetworkWordCount.scala:48) > at > org.apache.spark.examples.streaming.StatefulNetworkWordCount.main(StatefulNetworkWordCount.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:483) > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:727) > at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > {code} > The reason is that TrackStateRDDs generated by trackStateByKey expect the > previous batch's TrackStateRDDs to have a partitioner. However, when recovery > from DStream checkpoints, the RDDs recovered from RDD checkpoints do not have > a partitioner attached to it. This is because RDD checkpoints do not preserve > the partitioner (SPARK-12004). -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org