[ 
https://issues.apache.org/jira/browse/SPARK-6770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14486712#comment-14486712
 ] 

Saisai Shao commented on SPARK-6770:
------------------------------------

I think you have to put your streaming related logic into the function 
{{functionToCreateContext}}, you could refer to the related Spark Streaming 
example {{RecoverableNetworkWordCount}} to change your code. 

I think it is not a bug, you'd better try again.

> DirectKafkaInputDStream has not been initialized when recovery from checkpoint
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-6770
>                 URL: https://issues.apache.org/jira/browse/SPARK-6770
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.3.0
>            Reporter: yangping wu
>
> I am  read data from kafka using createDirectStream method and save the 
> received log to Mysql, the code snippets as follows
> {code}
>     def functionToCreateContext(): StreamingContext = {
>       val sparkConf = new SparkConf()
>       val sc = new SparkContext(sparkConf)
>       val ssc = new StreamingContext(sc, Seconds(10))
>       ssc.checkpoint("/tmp/kafka/channel/offset") // set checkpoint directory
>       ssc
>     }
>     val struct = StructType(StructField("log", StringType) ::Nil)
>     // Get StreamingContext from checkpoint data or create a new one
>     val ssc = StreamingContext.getOrCreate("/tmp/kafka/channel/offset", 
> functionToCreateContext)
>     val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, 
> StringDecoder](ssc, kafkaParams, topics)
>     val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
>     SDB.foreachRDD(rdd => {
>       val result = rdd.map(item => {
>         println(item)
>         val result = item._2 match {
>           case e: String => Row.apply(e)
>           case _ => Row.apply("")
>         }
>         result
>       })
>       println(result.count())
>       val df = sqlContext.createDataFrame(result, struct)
>       df.insertIntoJDBC(url, "test", overwrite = false)
>     })
>     ssc.start()
>     ssc.awaitTermination()
>     ssc.stop()
> {code}
> But when I  recovery the program from checkpoint, I encountered an exception:
> {code}
> Exception in thread "main" org.apache.spark.SparkException: 
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not 
> been initialized
>       at 
> org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
>       at 
> org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>       at scala.Option.orElse(Option.scala:257)
>       at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>       at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>       at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>       at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>       at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>       at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>       at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>       at 
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218)
>       at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>       at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67)
>       at 
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512)
>       at logstatstreaming.UserChannelTodb$.main(UserChannelTodb.scala:57)
>       at logstatstreaming.UserChannelTodb.main(UserChannelTodb.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>       at java.lang.reflect.Method.invoke(Method.java:597)
>       at 
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
>       at 
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
>       at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
>       at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
>       at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> {code}
> Not sure if this is a bug or a feature, but it's not obvious, so wanted to 
> create a JIRA to make sure we document this behavior.Is someone can help me 
> to see the reasons? Thank you.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to