[ 
https://issues.apache.org/jira/browse/SPARK-6770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14487103#comment-14487103
 ] 

Sean Owen commented on SPARK-6770:
----------------------------------

That may be so, but it's not obvious that you simply can't use Spark SQL with 
Streaming recovery. For example, the final error makes it sound like it very 
nearly works. Perhaps you just need to use a different constructor to specify 
the SQLConf? maybe this value should be serialized with some object? It might 
be something that is hard to make work now but I wonder if there is an easy fix 
to make the SQL objects "recoverable".

> DirectKafkaInputDStream has not been initialized when recovery from checkpoint
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-6770
>                 URL: https://issues.apache.org/jira/browse/SPARK-6770
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.3.0
>            Reporter: yangping wu
>
> I am  read data from kafka using createDirectStream method and save the 
> received log to Mysql, the code snippets as follows
> {code}
>     def functionToCreateContext(): StreamingContext = {
>       val sparkConf = new SparkConf()
>       val sc = new SparkContext(sparkConf)
>       val ssc = new StreamingContext(sc, Seconds(10))
>       ssc.checkpoint("/tmp/kafka/channel/offset") // set checkpoint directory
>       ssc
>     }
>     val struct = StructType(StructField("log", StringType) ::Nil)
>     // Get StreamingContext from checkpoint data or create a new one
>     val ssc = StreamingContext.getOrCreate("/tmp/kafka/channel/offset", 
> functionToCreateContext)
>     val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, 
> StringDecoder](ssc, kafkaParams, topics)
>     val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
>     SDB.foreachRDD(rdd => {
>       val result = rdd.map(item => {
>         println(item)
>         val result = item._2 match {
>           case e: String => Row.apply(e)
>           case _ => Row.apply("")
>         }
>         result
>       })
>       println(result.count())
>       val df = sqlContext.createDataFrame(result, struct)
>       df.insertIntoJDBC(url, "test", overwrite = false)
>     })
>     ssc.start()
>     ssc.awaitTermination()
>     ssc.stop()
> {code}
> But when I  recovery the program from checkpoint, I encountered an exception:
> {code}
> Exception in thread "main" org.apache.spark.SparkException: 
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not 
> been initialized
>       at 
> org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
>       at 
> org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
>       at scala.Option.orElse(Option.scala:257)
>       at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
>       at 
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>       at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>       at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
>       at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>       at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>       at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>       at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>       at 
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218)
>       at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>       at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218)
>       at 
> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89)
>       at 
> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67)
>       at 
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512)
>       at logstatstreaming.UserChannelTodb$.main(UserChannelTodb.scala:57)
>       at logstatstreaming.UserChannelTodb.main(UserChannelTodb.scala)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>       at java.lang.reflect.Method.invoke(Method.java:597)
>       at 
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
>       at 
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
>       at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
>       at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
>       at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> {code}
> Not sure if this is a bug or a feature, but it's not obvious, so wanted to 
> create a JIRA to make sure we document this behavior.Is someone can help me 
> to see the reasons? Thank you.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to