[jira] [Commented] (SPARK-6770) DirectKafkaInputDStream has not been initialized when recovery from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-6770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14534078#comment-14534078 ] Tathagata Das commented on SPARK-6770: -- Was this problem solved? I think I discuss this explicitly in the Streaming guide here. http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations If this solves the issue, I am inclined to close this JIRA. Either way, this is not a problem with DirectKafkaInputDStream as this JIRA title seem to indicate. DirectKafkaInputDStream has not been initialized when recovery from checkpoint -- Key: SPARK-6770 URL: https://issues.apache.org/jira/browse/SPARK-6770 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.3.0 Reporter: yangping wu I am read data from kafka using createDirectStream method and save the received log to Mysql, the code snippets as follows {code} def functionToCreateContext(): StreamingContext = { val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(10)) ssc.checkpoint(/tmp/kafka/channel/offset) // set checkpoint directory ssc } val struct = StructType(StructField(log, StringType) ::Nil) // Get StreamingContext from checkpoint data or create a new one val ssc = StreamingContext.getOrCreate(/tmp/kafka/channel/offset, functionToCreateContext) val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) SDB.foreachRDD(rdd = { val result = rdd.map(item = { println(item) val result = item._2 match { case e: String = Row.apply(e) case _ = Row.apply() } result }) println(result.count()) val df = sqlContext.createDataFrame(result, struct) df.insertIntoJDBC(url, test, overwrite = false) }) ssc.start() ssc.awaitTermination() ssc.stop() {code} But when I recovery the program from checkpoint, I encountered an exception: {code} Exception in thread main org.apache.spark.SparkException: org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266) at org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512) at logstatstreaming.UserChannelTodb$.main(UserChannelTodb.scala:57) at logstatstreaming.UserChannelTodb.main(UserChannelTodb.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at
[jira] [Commented] (SPARK-6770) DirectKafkaInputDStream has not been initialized when recovery from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-6770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14534209#comment-14534209 ] yangping wu commented on SPARK-6770: Hi [~tdas], I use the code you mentioned, It was successes recovery from checkpoint. It solves the issue, Thank you. DirectKafkaInputDStream has not been initialized when recovery from checkpoint -- Key: SPARK-6770 URL: https://issues.apache.org/jira/browse/SPARK-6770 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.3.0 Reporter: yangping wu I am read data from kafka using createDirectStream method and save the received log to Mysql, the code snippets as follows {code} def functionToCreateContext(): StreamingContext = { val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(10)) ssc.checkpoint(/tmp/kafka/channel/offset) // set checkpoint directory ssc } val struct = StructType(StructField(log, StringType) ::Nil) // Get StreamingContext from checkpoint data or create a new one val ssc = StreamingContext.getOrCreate(/tmp/kafka/channel/offset, functionToCreateContext) val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) SDB.foreachRDD(rdd = { val result = rdd.map(item = { println(item) val result = item._2 match { case e: String = Row.apply(e) case _ = Row.apply() } result }) println(result.count()) val df = sqlContext.createDataFrame(result, struct) df.insertIntoJDBC(url, test, overwrite = false) }) ssc.start() ssc.awaitTermination() ssc.stop() {code} But when I recovery the program from checkpoint, I encountered an exception: {code} Exception in thread main org.apache.spark.SparkException: org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266) at org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512) at logstatstreaming.UserChannelTodb$.main(UserChannelTodb.scala:57) at logstatstreaming.UserChannelTodb.main(UserChannelTodb.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at
[jira] [Commented] (SPARK-6770) DirectKafkaInputDStream has not been initialized when recovery from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-6770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14534240#comment-14534240 ] Tathagata Das commented on SPARK-6770: -- Awesome! I am closing this JIRA then! DirectKafkaInputDStream has not been initialized when recovery from checkpoint -- Key: SPARK-6770 URL: https://issues.apache.org/jira/browse/SPARK-6770 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.3.0 Reporter: yangping wu I am read data from kafka using createDirectStream method and save the received log to Mysql, the code snippets as follows {code} def functionToCreateContext(): StreamingContext = { val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(10)) ssc.checkpoint(/tmp/kafka/channel/offset) // set checkpoint directory ssc } val struct = StructType(StructField(log, StringType) ::Nil) // Get StreamingContext from checkpoint data or create a new one val ssc = StreamingContext.getOrCreate(/tmp/kafka/channel/offset, functionToCreateContext) val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) SDB.foreachRDD(rdd = { val result = rdd.map(item = { println(item) val result = item._2 match { case e: String = Row.apply(e) case _ = Row.apply() } result }) println(result.count()) val df = sqlContext.createDataFrame(result, struct) df.insertIntoJDBC(url, test, overwrite = false) }) ssc.start() ssc.awaitTermination() ssc.stop() {code} But when I recovery the program from checkpoint, I encountered an exception: {code} Exception in thread main org.apache.spark.SparkException: org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266) at org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512) at logstatstreaming.UserChannelTodb$.main(UserChannelTodb.scala:57) at logstatstreaming.UserChannelTodb.main(UserChannelTodb.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at
[jira] [Commented] (SPARK-6770) DirectKafkaInputDStream has not been initialized when recovery from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-6770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14486794#comment-14486794 ] yangping wu commented on SPARK-6770: Ok, Thank you very much for your reply. I will try to use pure Spark Streaming program and use pure scala jdbc to write data to mysql. DirectKafkaInputDStream has not been initialized when recovery from checkpoint -- Key: SPARK-6770 URL: https://issues.apache.org/jira/browse/SPARK-6770 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.3.0 Reporter: yangping wu I am read data from kafka using createDirectStream method and save the received log to Mysql, the code snippets as follows {code} def functionToCreateContext(): StreamingContext = { val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(10)) ssc.checkpoint(/tmp/kafka/channel/offset) // set checkpoint directory ssc } val struct = StructType(StructField(log, StringType) ::Nil) // Get StreamingContext from checkpoint data or create a new one val ssc = StreamingContext.getOrCreate(/tmp/kafka/channel/offset, functionToCreateContext) val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) SDB.foreachRDD(rdd = { val result = rdd.map(item = { println(item) val result = item._2 match { case e: String = Row.apply(e) case _ = Row.apply() } result }) println(result.count()) val df = sqlContext.createDataFrame(result, struct) df.insertIntoJDBC(url, test, overwrite = false) }) ssc.start() ssc.awaitTermination() ssc.stop() {code} But when I recovery the program from checkpoint, I encountered an exception: {code} Exception in thread main org.apache.spark.SparkException: org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266) at org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512) at logstatstreaming.UserChannelTodb$.main(UserChannelTodb.scala:57) at logstatstreaming.UserChannelTodb.main(UserChannelTodb.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597)
[jira] [Commented] (SPARK-6770) DirectKafkaInputDStream has not been initialized when recovery from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-6770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14486782#comment-14486782 ] Saisai Shao commented on SPARK-6770: From my understanding, the basic scenario of your code is trying to put the Kafka data into database using JDBC, and you want to leverage SparkSQL for easy implementation. I think if you want to use checkpoint file to recover from driver failure, it would be better to write a pure Spark Streaming program, the Spark Streaming's checkpointing mechanism only guarantee streaming's related metadata to write and recover. The more you use third-party tools, the less it can recover from current mechanism. DirectKafkaInputDStream has not been initialized when recovery from checkpoint -- Key: SPARK-6770 URL: https://issues.apache.org/jira/browse/SPARK-6770 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.3.0 Reporter: yangping wu I am read data from kafka using createDirectStream method and save the received log to Mysql, the code snippets as follows {code} def functionToCreateContext(): StreamingContext = { val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(10)) ssc.checkpoint(/tmp/kafka/channel/offset) // set checkpoint directory ssc } val struct = StructType(StructField(log, StringType) ::Nil) // Get StreamingContext from checkpoint data or create a new one val ssc = StreamingContext.getOrCreate(/tmp/kafka/channel/offset, functionToCreateContext) val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) SDB.foreachRDD(rdd = { val result = rdd.map(item = { println(item) val result = item._2 match { case e: String = Row.apply(e) case _ = Row.apply() } result }) println(result.count()) val df = sqlContext.createDataFrame(result, struct) df.insertIntoJDBC(url, test, overwrite = false) }) ssc.start() ssc.awaitTermination() ssc.stop() {code} But when I recovery the program from checkpoint, I encountered an exception: {code} Exception in thread main org.apache.spark.SparkException: org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266) at org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512) at logstatstreaming.UserChannelTodb$.main(UserChannelTodb.scala:57) at
[jira] [Commented] (SPARK-6770) DirectKafkaInputDStream has not been initialized when recovery from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-6770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14487103#comment-14487103 ] Sean Owen commented on SPARK-6770: -- That may be so, but it's not obvious that you simply can't use Spark SQL with Streaming recovery. For example, the final error makes it sound like it very nearly works. Perhaps you just need to use a different constructor to specify the SQLConf? maybe this value should be serialized with some object? It might be something that is hard to make work now but I wonder if there is an easy fix to make the SQL objects recoverable. DirectKafkaInputDStream has not been initialized when recovery from checkpoint -- Key: SPARK-6770 URL: https://issues.apache.org/jira/browse/SPARK-6770 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.3.0 Reporter: yangping wu I am read data from kafka using createDirectStream method and save the received log to Mysql, the code snippets as follows {code} def functionToCreateContext(): StreamingContext = { val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(10)) ssc.checkpoint(/tmp/kafka/channel/offset) // set checkpoint directory ssc } val struct = StructType(StructField(log, StringType) ::Nil) // Get StreamingContext from checkpoint data or create a new one val ssc = StreamingContext.getOrCreate(/tmp/kafka/channel/offset, functionToCreateContext) val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) SDB.foreachRDD(rdd = { val result = rdd.map(item = { println(item) val result = item._2 match { case e: String = Row.apply(e) case _ = Row.apply() } result }) println(result.count()) val df = sqlContext.createDataFrame(result, struct) df.insertIntoJDBC(url, test, overwrite = false) }) ssc.start() ssc.awaitTermination() ssc.stop() {code} But when I recovery the program from checkpoint, I encountered an exception: {code} Exception in thread main org.apache.spark.SparkException: org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266) at org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512) at logstatstreaming.UserChannelTodb$.main(UserChannelTodb.scala:57) at logstatstreaming.UserChannelTodb.main(UserChannelTodb.scala) at
[jira] [Commented] (SPARK-6770) DirectKafkaInputDStream has not been initialized when recovery from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-6770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14486754#comment-14486754 ] Saisai Shao commented on SPARK-6770: I guess SQLContext may not well support streaming checkpoint mechanism from my first glance, looks like SQLConf could not be recovered through checkpoint file. DirectKafkaInputDStream has not been initialized when recovery from checkpoint -- Key: SPARK-6770 URL: https://issues.apache.org/jira/browse/SPARK-6770 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.3.0 Reporter: yangping wu I am read data from kafka using createDirectStream method and save the received log to Mysql, the code snippets as follows {code} def functionToCreateContext(): StreamingContext = { val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(10)) ssc.checkpoint(/tmp/kafka/channel/offset) // set checkpoint directory ssc } val struct = StructType(StructField(log, StringType) ::Nil) // Get StreamingContext from checkpoint data or create a new one val ssc = StreamingContext.getOrCreate(/tmp/kafka/channel/offset, functionToCreateContext) val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) SDB.foreachRDD(rdd = { val result = rdd.map(item = { println(item) val result = item._2 match { case e: String = Row.apply(e) case _ = Row.apply() } result }) println(result.count()) val df = sqlContext.createDataFrame(result, struct) df.insertIntoJDBC(url, test, overwrite = false) }) ssc.start() ssc.awaitTermination() ssc.stop() {code} But when I recovery the program from checkpoint, I encountered an exception: {code} Exception in thread main org.apache.spark.SparkException: org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266) at org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512) at logstatstreaming.UserChannelTodb$.main(UserChannelTodb.scala:57) at logstatstreaming.UserChannelTodb.main(UserChannelTodb.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at
[jira] [Commented] (SPARK-6770) DirectKafkaInputDStream has not been initialized when recovery from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-6770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14486712#comment-14486712 ] Saisai Shao commented on SPARK-6770: I think you have to put your streaming related logic into the function {{functionToCreateContext}}, you could refer to the related Spark Streaming example {{RecoverableNetworkWordCount}} to change your code. I think it is not a bug, you'd better try again. DirectKafkaInputDStream has not been initialized when recovery from checkpoint -- Key: SPARK-6770 URL: https://issues.apache.org/jira/browse/SPARK-6770 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.3.0 Reporter: yangping wu I am read data from kafka using createDirectStream method and save the received log to Mysql, the code snippets as follows {code} def functionToCreateContext(): StreamingContext = { val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(10)) ssc.checkpoint(/tmp/kafka/channel/offset) // set checkpoint directory ssc } val struct = StructType(StructField(log, StringType) ::Nil) // Get StreamingContext from checkpoint data or create a new one val ssc = StreamingContext.getOrCreate(/tmp/kafka/channel/offset, functionToCreateContext) val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) SDB.foreachRDD(rdd = { val result = rdd.map(item = { println(item) val result = item._2 match { case e: String = Row.apply(e) case _ = Row.apply() } result }) println(result.count()) val df = sqlContext.createDataFrame(result, struct) df.insertIntoJDBC(url, test, overwrite = false) }) ssc.start() ssc.awaitTermination() ssc.stop() {code} But when I recovery the program from checkpoint, I encountered an exception: {code} Exception in thread main org.apache.spark.SparkException: org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266) at org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512) at logstatstreaming.UserChannelTodb$.main(UserChannelTodb.scala:57) at logstatstreaming.UserChannelTodb.main(UserChannelTodb.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at
[jira] [Commented] (SPARK-6770) DirectKafkaInputDStream has not been initialized when recovery from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-6770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14486737#comment-14486737 ] yangping wu commented on SPARK-6770: Hi Saisai Shao, Thank you for you reply. I've tried to put my streaming related logic into the function ttfunctionToCreateContext/tt, as follow: {code} def functionToCreateContext() = { val sparkConf = new SparkConf().setAppName(channelAnalyser) val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(10)) ssc.checkpoint(/tmp/kafka/test/offset) val sqlContext = new org.apache.spark.sql.SQLContext(sc) val test = Set(test) val struct = StructType(StructField(log, StringType) ::Nil) val kafkaParams = Map[String, String](metadata.broker.list - 192.168.100.11:9092,192.168.100.12:9092,192.168.100.13:9092, group.id - test-consumer-group111) val url = jdbc:mysql://192.168.100.10:3306/spark?user=adminpassword=123456useUnicode=truecharacterEncoding=utf8autoReconnect=true val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, test) SDB.foreachRDD(rdd = { val result = rdd.map(item = { item._2 match { case e: String = Row.apply(e) case _ = Row.apply() } }) try { println(result.count()) val df = sqlContext.createDataFrame(result, struct) df.insertIntoJDBC(url, testTable, overwrite = false) } catch { case e: Exception = e.printStackTrace() } }) ssc } val ssc = StreamingContext.getOrCreate(/tmp/kafka/test/offset, functionToCreateContext) ssc.start() {code} But when I recovery the program from checkpoint, I encountered an exception: {code} java.lang.NullPointerException at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:217) at org.apache.spark.sql.SQLConf.dataFrameEagerAnalysis(SQLConf.scala:191) at org.apache.spark.sql.DataFrame.init(DataFrame.scala:132) at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51) at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:381) at logstatstreaming.FlightSearchTodb$$anonfun$logstatstreaming$FlightSearchTodb$$functionToCreateContext$1$1.apply(FlightSearchTodb.scala:57) at logstatstreaming.FlightSearchTodb$$anonfun$logstatstreaming$FlightSearchTodb$$functionToCreateContext$1$1.apply(FlightSearchTodb.scala:40) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:619) {code} It seems to be the SQLContext has not been initialized, so the ttsettingstt is not initialized in the ttorg.apache.spark.sql.SQLConf/tt. then {code} private[spark] def dataFrameEagerAnalysis: Boolean = getConf(DATAFRAME_EAGER_ANALYSIS, true).toBoolean {code} throw java.lang.NullPointerException. DirectKafkaInputDStream has not been initialized when recovery from checkpoint -- Key: SPARK-6770 URL: https://issues.apache.org/jira/browse/SPARK-6770 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.3.0 Reporter: yangping wu I am read data from kafka using createDirectStream method and save the received log to Mysql, the code snippets as follows {code} def functionToCreateContext(): StreamingContext = { val sparkConf = new SparkConf()
[jira] [Commented] (SPARK-6770) DirectKafkaInputDStream has not been initialized when recovery from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-6770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14486736#comment-14486736 ] yangping wu commented on SPARK-6770: Hi Saisai Shao, Thank you for you reply. I've tried to put my streaming related logic into the function ttfunctionToCreateContext/tt, as follow: {code} def functionToCreateContext() = { val sparkConf = new SparkConf().setAppName(channelAnalyser) val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(10)) ssc.checkpoint(/tmp/kafka/test/offset) val sqlContext = new org.apache.spark.sql.SQLContext(sc) val test = Set(test) val struct = StructType(StructField(log, StringType) ::Nil) val kafkaParams = Map[String, String](metadata.broker.list - 192.168.100.11:9092,192.168.100.12:9092,192.168.100.13:9092, group.id - test-consumer-group111) val url = jdbc:mysql://192.168.100.10:3306/spark?user=adminpassword=123456useUnicode=truecharacterEncoding=utf8autoReconnect=true val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, test) SDB.foreachRDD(rdd = { val result = rdd.map(item = { item._2 match { case e: String = Row.apply(e) case _ = Row.apply() } }) try { println(result.count()) val df = sqlContext.createDataFrame(result, struct) df.insertIntoJDBC(url, testTable, overwrite = false) } catch { case e: Exception = e.printStackTrace() } }) ssc } val ssc = StreamingContext.getOrCreate(/tmp/kafka/test/offset, functionToCreateContext) ssc.start() {code} But when I recovery the program from checkpoint, I encountered an exception: {code} java.lang.NullPointerException at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:217) at org.apache.spark.sql.SQLConf.dataFrameEagerAnalysis(SQLConf.scala:191) at org.apache.spark.sql.DataFrame.init(DataFrame.scala:132) at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51) at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:381) at logstatstreaming.FlightSearchTodb$$anonfun$logstatstreaming$FlightSearchTodb$$functionToCreateContext$1$1.apply(FlightSearchTodb.scala:57) at logstatstreaming.FlightSearchTodb$$anonfun$logstatstreaming$FlightSearchTodb$$functionToCreateContext$1$1.apply(FlightSearchTodb.scala:40) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:619) {code} It seems to be the SQLContext has not been initialized, so the ttsettingstt is not initialized in the ttorg.apache.spark.sql.SQLConf/tt. then {code} private[spark] def dataFrameEagerAnalysis: Boolean = getConf(DATAFRAME_EAGER_ANALYSIS, true).toBoolean {code} throw java.lang.NullPointerException. DirectKafkaInputDStream has not been initialized when recovery from checkpoint -- Key: SPARK-6770 URL: https://issues.apache.org/jira/browse/SPARK-6770 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.3.0 Reporter: yangping wu I am read data from kafka using createDirectStream method and save the received log to Mysql, the code snippets as follows {code} def functionToCreateContext(): StreamingContext = { val sparkConf = new SparkConf()
[jira] [Commented] (SPARK-6770) DirectKafkaInputDStream has not been initialized when recovery from checkpoint
[ https://issues.apache.org/jira/browse/SPARK-6770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14486760#comment-14486760 ] yangping wu commented on SPARK-6770: Yes, I think so. How to use other methods to solve it? DirectKafkaInputDStream has not been initialized when recovery from checkpoint -- Key: SPARK-6770 URL: https://issues.apache.org/jira/browse/SPARK-6770 Project: Spark Issue Type: Bug Components: Streaming Affects Versions: 1.3.0 Reporter: yangping wu I am read data from kafka using createDirectStream method and save the received log to Mysql, the code snippets as follows {code} def functionToCreateContext(): StreamingContext = { val sparkConf = new SparkConf() val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(10)) ssc.checkpoint(/tmp/kafka/channel/offset) // set checkpoint directory ssc } val struct = StructType(StructField(log, StringType) ::Nil) // Get StreamingContext from checkpoint data or create a new one val ssc = StreamingContext.getOrCreate(/tmp/kafka/channel/offset, functionToCreateContext) val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics) val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) SDB.foreachRDD(rdd = { val result = rdd.map(item = { println(item) val result = item._2 match { case e: String = Row.apply(e) case _ = Row.apply() } result }) println(result.count()) val df = sqlContext.createDataFrame(result, struct) df.insertIntoJDBC(url, test, overwrite = false) }) ssc.start() ssc.awaitTermination() ssc.stop() {code} But when I recovery the program from checkpoint, I encountered an exception: {code} Exception in thread main org.apache.spark.SparkException: org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266) at org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512) at logstatstreaming.UserChannelTodb$.main(UserChannelTodb.scala:57) at logstatstreaming.UserChannelTodb.main(UserChannelTodb.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at