[jira] [Commented] (SPARK-6770) DirectKafkaInputDStream has not been initialized when recovery from checkpoint

2015-05-08 Thread Tathagata Das (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14534078#comment-14534078
 ] 

Tathagata Das commented on SPARK-6770:
--

Was this problem solved? I think I discuss this explicitly in the Streaming 
guide here. 
http://spark.apache.org/docs/latest/streaming-programming-guide.html#dataframe-and-sql-operations

If this solves the issue, I am inclined to close this JIRA. Either way, this is 
not a problem with DirectKafkaInputDStream as this JIRA title seem to indicate.

 DirectKafkaInputDStream has not been initialized when recovery from checkpoint
 --

 Key: SPARK-6770
 URL: https://issues.apache.org/jira/browse/SPARK-6770
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: yangping wu

 I am  read data from kafka using createDirectStream method and save the 
 received log to Mysql, the code snippets as follows
 {code}
 def functionToCreateContext(): StreamingContext = {
   val sparkConf = new SparkConf()
   val sc = new SparkContext(sparkConf)
   val ssc = new StreamingContext(sc, Seconds(10))
   ssc.checkpoint(/tmp/kafka/channel/offset) // set checkpoint directory
   ssc
 }
 val struct = StructType(StructField(log, StringType) ::Nil)
 // Get StreamingContext from checkpoint data or create a new one
 val ssc = StreamingContext.getOrCreate(/tmp/kafka/channel/offset, 
 functionToCreateContext)
 val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, 
 StringDecoder](ssc, kafkaParams, topics)
 val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
 SDB.foreachRDD(rdd = {
   val result = rdd.map(item = {
 println(item)
 val result = item._2 match {
   case e: String = Row.apply(e)
   case _ = Row.apply()
 }
 result
   })
   println(result.count())
   val df = sqlContext.createDataFrame(result, struct)
   df.insertIntoJDBC(url, test, overwrite = false)
 })
 ssc.start()
 ssc.awaitTermination()
 ssc.stop()
 {code}
 But when I  recovery the program from checkpoint, I encountered an exception:
 {code}
 Exception in thread main org.apache.spark.SparkException: 
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not 
 been initialized
   at 
 org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
   at 
 org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51)
   at 
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
   at 
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
   at scala.Option.orElse(Option.scala:257)
   at 
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
   at 
 org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
   at 
 org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
   at 
 org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
   at 
 org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89)
   at 
 org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67)
   at 
 org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512)
   at logstatstreaming.UserChannelTodb$.main(UserChannelTodb.scala:57)
   at logstatstreaming.UserChannelTodb.main(UserChannelTodb.scala)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
 

[jira] [Commented] (SPARK-6770) DirectKafkaInputDStream has not been initialized when recovery from checkpoint

2015-05-08 Thread yangping wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14534209#comment-14534209
 ] 

yangping wu commented on SPARK-6770:


Hi [~tdas], I use the code you mentioned, It was successes recovery from 
checkpoint. It  solves the issue, Thank you.

 DirectKafkaInputDStream has not been initialized when recovery from checkpoint
 --

 Key: SPARK-6770
 URL: https://issues.apache.org/jira/browse/SPARK-6770
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: yangping wu

 I am  read data from kafka using createDirectStream method and save the 
 received log to Mysql, the code snippets as follows
 {code}
 def functionToCreateContext(): StreamingContext = {
   val sparkConf = new SparkConf()
   val sc = new SparkContext(sparkConf)
   val ssc = new StreamingContext(sc, Seconds(10))
   ssc.checkpoint(/tmp/kafka/channel/offset) // set checkpoint directory
   ssc
 }
 val struct = StructType(StructField(log, StringType) ::Nil)
 // Get StreamingContext from checkpoint data or create a new one
 val ssc = StreamingContext.getOrCreate(/tmp/kafka/channel/offset, 
 functionToCreateContext)
 val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, 
 StringDecoder](ssc, kafkaParams, topics)
 val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
 SDB.foreachRDD(rdd = {
   val result = rdd.map(item = {
 println(item)
 val result = item._2 match {
   case e: String = Row.apply(e)
   case _ = Row.apply()
 }
 result
   })
   println(result.count())
   val df = sqlContext.createDataFrame(result, struct)
   df.insertIntoJDBC(url, test, overwrite = false)
 })
 ssc.start()
 ssc.awaitTermination()
 ssc.stop()
 {code}
 But when I  recovery the program from checkpoint, I encountered an exception:
 {code}
 Exception in thread main org.apache.spark.SparkException: 
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not 
 been initialized
   at 
 org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
   at 
 org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51)
   at 
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
   at 
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
   at scala.Option.orElse(Option.scala:257)
   at 
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
   at 
 org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
   at 
 org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
   at 
 org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
   at 
 org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89)
   at 
 org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67)
   at 
 org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512)
   at logstatstreaming.UserChannelTodb$.main(UserChannelTodb.scala:57)
   at logstatstreaming.UserChannelTodb.main(UserChannelTodb.scala)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
   at java.lang.reflect.Method.invoke(Method.java:597)
   at 
 

[jira] [Commented] (SPARK-6770) DirectKafkaInputDStream has not been initialized when recovery from checkpoint

2015-05-08 Thread Tathagata Das (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14534240#comment-14534240
 ] 

Tathagata Das commented on SPARK-6770:
--

Awesome! I am closing this JIRA then!

 DirectKafkaInputDStream has not been initialized when recovery from checkpoint
 --

 Key: SPARK-6770
 URL: https://issues.apache.org/jira/browse/SPARK-6770
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: yangping wu

 I am  read data from kafka using createDirectStream method and save the 
 received log to Mysql, the code snippets as follows
 {code}
 def functionToCreateContext(): StreamingContext = {
   val sparkConf = new SparkConf()
   val sc = new SparkContext(sparkConf)
   val ssc = new StreamingContext(sc, Seconds(10))
   ssc.checkpoint(/tmp/kafka/channel/offset) // set checkpoint directory
   ssc
 }
 val struct = StructType(StructField(log, StringType) ::Nil)
 // Get StreamingContext from checkpoint data or create a new one
 val ssc = StreamingContext.getOrCreate(/tmp/kafka/channel/offset, 
 functionToCreateContext)
 val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, 
 StringDecoder](ssc, kafkaParams, topics)
 val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
 SDB.foreachRDD(rdd = {
   val result = rdd.map(item = {
 println(item)
 val result = item._2 match {
   case e: String = Row.apply(e)
   case _ = Row.apply()
 }
 result
   })
   println(result.count())
   val df = sqlContext.createDataFrame(result, struct)
   df.insertIntoJDBC(url, test, overwrite = false)
 })
 ssc.start()
 ssc.awaitTermination()
 ssc.stop()
 {code}
 But when I  recovery the program from checkpoint, I encountered an exception:
 {code}
 Exception in thread main org.apache.spark.SparkException: 
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not 
 been initialized
   at 
 org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
   at 
 org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51)
   at 
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
   at 
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
   at scala.Option.orElse(Option.scala:257)
   at 
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
   at 
 org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
   at 
 org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
   at 
 org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
   at 
 org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89)
   at 
 org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67)
   at 
 org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512)
   at logstatstreaming.UserChannelTodb$.main(UserChannelTodb.scala:57)
   at logstatstreaming.UserChannelTodb.main(UserChannelTodb.scala)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
   at java.lang.reflect.Method.invoke(Method.java:597)
   at 
 

[jira] [Commented] (SPARK-6770) DirectKafkaInputDStream has not been initialized when recovery from checkpoint

2015-04-09 Thread yangping wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14486794#comment-14486794
 ] 

yangping wu commented on SPARK-6770:


Ok, Thank you very much for your reply. I will try to use pure Spark Streaming 
program and use pure scala jdbc to write data to mysql.

 DirectKafkaInputDStream has not been initialized when recovery from checkpoint
 --

 Key: SPARK-6770
 URL: https://issues.apache.org/jira/browse/SPARK-6770
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: yangping wu

 I am  read data from kafka using createDirectStream method and save the 
 received log to Mysql, the code snippets as follows
 {code}
 def functionToCreateContext(): StreamingContext = {
   val sparkConf = new SparkConf()
   val sc = new SparkContext(sparkConf)
   val ssc = new StreamingContext(sc, Seconds(10))
   ssc.checkpoint(/tmp/kafka/channel/offset) // set checkpoint directory
   ssc
 }
 val struct = StructType(StructField(log, StringType) ::Nil)
 // Get StreamingContext from checkpoint data or create a new one
 val ssc = StreamingContext.getOrCreate(/tmp/kafka/channel/offset, 
 functionToCreateContext)
 val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, 
 StringDecoder](ssc, kafkaParams, topics)
 val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
 SDB.foreachRDD(rdd = {
   val result = rdd.map(item = {
 println(item)
 val result = item._2 match {
   case e: String = Row.apply(e)
   case _ = Row.apply()
 }
 result
   })
   println(result.count())
   val df = sqlContext.createDataFrame(result, struct)
   df.insertIntoJDBC(url, test, overwrite = false)
 })
 ssc.start()
 ssc.awaitTermination()
 ssc.stop()
 {code}
 But when I  recovery the program from checkpoint, I encountered an exception:
 {code}
 Exception in thread main org.apache.spark.SparkException: 
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not 
 been initialized
   at 
 org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
   at 
 org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51)
   at 
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
   at 
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
   at scala.Option.orElse(Option.scala:257)
   at 
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
   at 
 org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
   at 
 org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
   at 
 org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
   at 
 org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89)
   at 
 org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67)
   at 
 org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512)
   at logstatstreaming.UserChannelTodb$.main(UserChannelTodb.scala:57)
   at logstatstreaming.UserChannelTodb.main(UserChannelTodb.scala)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
   at java.lang.reflect.Method.invoke(Method.java:597)
 

[jira] [Commented] (SPARK-6770) DirectKafkaInputDStream has not been initialized when recovery from checkpoint

2015-04-09 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14486782#comment-14486782
 ] 

Saisai Shao commented on SPARK-6770:


From my understanding, the basic scenario of your code is trying to put the 
Kafka data into database using JDBC, and you want to leverage SparkSQL for 
easy implementation. I think if you want to use checkpoint file to recover 
from driver failure, it would be better to write a pure Spark Streaming 
program, the Spark Streaming's checkpointing mechanism only guarantee 
streaming's related metadata to write and recover. The more you use 
third-party tools, the less it can recover from current mechanism.

 DirectKafkaInputDStream has not been initialized when recovery from checkpoint
 --

 Key: SPARK-6770
 URL: https://issues.apache.org/jira/browse/SPARK-6770
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: yangping wu

 I am  read data from kafka using createDirectStream method and save the 
 received log to Mysql, the code snippets as follows
 {code}
 def functionToCreateContext(): StreamingContext = {
   val sparkConf = new SparkConf()
   val sc = new SparkContext(sparkConf)
   val ssc = new StreamingContext(sc, Seconds(10))
   ssc.checkpoint(/tmp/kafka/channel/offset) // set checkpoint directory
   ssc
 }
 val struct = StructType(StructField(log, StringType) ::Nil)
 // Get StreamingContext from checkpoint data or create a new one
 val ssc = StreamingContext.getOrCreate(/tmp/kafka/channel/offset, 
 functionToCreateContext)
 val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, 
 StringDecoder](ssc, kafkaParams, topics)
 val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
 SDB.foreachRDD(rdd = {
   val result = rdd.map(item = {
 println(item)
 val result = item._2 match {
   case e: String = Row.apply(e)
   case _ = Row.apply()
 }
 result
   })
   println(result.count())
   val df = sqlContext.createDataFrame(result, struct)
   df.insertIntoJDBC(url, test, overwrite = false)
 })
 ssc.start()
 ssc.awaitTermination()
 ssc.stop()
 {code}
 But when I  recovery the program from checkpoint, I encountered an exception:
 {code}
 Exception in thread main org.apache.spark.SparkException: 
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not 
 been initialized
   at 
 org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
   at 
 org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51)
   at 
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
   at 
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
   at scala.Option.orElse(Option.scala:257)
   at 
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
   at 
 org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
   at 
 org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
   at 
 org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
   at 
 org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89)
   at 
 org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67)
   at 
 org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512)
   at logstatstreaming.UserChannelTodb$.main(UserChannelTodb.scala:57)
   at 

[jira] [Commented] (SPARK-6770) DirectKafkaInputDStream has not been initialized when recovery from checkpoint

2015-04-09 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14487103#comment-14487103
 ] 

Sean Owen commented on SPARK-6770:
--

That may be so, but it's not obvious that you simply can't use Spark SQL with 
Streaming recovery. For example, the final error makes it sound like it very 
nearly works. Perhaps you just need to use a different constructor to specify 
the SQLConf? maybe this value should be serialized with some object? It might 
be something that is hard to make work now but I wonder if there is an easy fix 
to make the SQL objects recoverable.

 DirectKafkaInputDStream has not been initialized when recovery from checkpoint
 --

 Key: SPARK-6770
 URL: https://issues.apache.org/jira/browse/SPARK-6770
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: yangping wu

 I am  read data from kafka using createDirectStream method and save the 
 received log to Mysql, the code snippets as follows
 {code}
 def functionToCreateContext(): StreamingContext = {
   val sparkConf = new SparkConf()
   val sc = new SparkContext(sparkConf)
   val ssc = new StreamingContext(sc, Seconds(10))
   ssc.checkpoint(/tmp/kafka/channel/offset) // set checkpoint directory
   ssc
 }
 val struct = StructType(StructField(log, StringType) ::Nil)
 // Get StreamingContext from checkpoint data or create a new one
 val ssc = StreamingContext.getOrCreate(/tmp/kafka/channel/offset, 
 functionToCreateContext)
 val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, 
 StringDecoder](ssc, kafkaParams, topics)
 val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
 SDB.foreachRDD(rdd = {
   val result = rdd.map(item = {
 println(item)
 val result = item._2 match {
   case e: String = Row.apply(e)
   case _ = Row.apply()
 }
 result
   })
   println(result.count())
   val df = sqlContext.createDataFrame(result, struct)
   df.insertIntoJDBC(url, test, overwrite = false)
 })
 ssc.start()
 ssc.awaitTermination()
 ssc.stop()
 {code}
 But when I  recovery the program from checkpoint, I encountered an exception:
 {code}
 Exception in thread main org.apache.spark.SparkException: 
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not 
 been initialized
   at 
 org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
   at 
 org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51)
   at 
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
   at 
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
   at scala.Option.orElse(Option.scala:257)
   at 
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
   at 
 org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
   at 
 org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
   at 
 org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
   at 
 org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89)
   at 
 org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67)
   at 
 org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512)
   at logstatstreaming.UserChannelTodb$.main(UserChannelTodb.scala:57)
   at logstatstreaming.UserChannelTodb.main(UserChannelTodb.scala)
   at 

[jira] [Commented] (SPARK-6770) DirectKafkaInputDStream has not been initialized when recovery from checkpoint

2015-04-08 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14486754#comment-14486754
 ] 

Saisai Shao commented on SPARK-6770:


I guess SQLContext may not well support streaming checkpoint mechanism from my 
first glance, looks like SQLConf could not be recovered through checkpoint file.

 DirectKafkaInputDStream has not been initialized when recovery from checkpoint
 --

 Key: SPARK-6770
 URL: https://issues.apache.org/jira/browse/SPARK-6770
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: yangping wu

 I am  read data from kafka using createDirectStream method and save the 
 received log to Mysql, the code snippets as follows
 {code}
 def functionToCreateContext(): StreamingContext = {
   val sparkConf = new SparkConf()
   val sc = new SparkContext(sparkConf)
   val ssc = new StreamingContext(sc, Seconds(10))
   ssc.checkpoint(/tmp/kafka/channel/offset) // set checkpoint directory
   ssc
 }
 val struct = StructType(StructField(log, StringType) ::Nil)
 // Get StreamingContext from checkpoint data or create a new one
 val ssc = StreamingContext.getOrCreate(/tmp/kafka/channel/offset, 
 functionToCreateContext)
 val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, 
 StringDecoder](ssc, kafkaParams, topics)
 val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
 SDB.foreachRDD(rdd = {
   val result = rdd.map(item = {
 println(item)
 val result = item._2 match {
   case e: String = Row.apply(e)
   case _ = Row.apply()
 }
 result
   })
   println(result.count())
   val df = sqlContext.createDataFrame(result, struct)
   df.insertIntoJDBC(url, test, overwrite = false)
 })
 ssc.start()
 ssc.awaitTermination()
 ssc.stop()
 {code}
 But when I  recovery the program from checkpoint, I encountered an exception:
 {code}
 Exception in thread main org.apache.spark.SparkException: 
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not 
 been initialized
   at 
 org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
   at 
 org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51)
   at 
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
   at 
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
   at scala.Option.orElse(Option.scala:257)
   at 
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
   at 
 org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
   at 
 org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
   at 
 org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
   at 
 org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89)
   at 
 org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67)
   at 
 org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512)
   at logstatstreaming.UserChannelTodb$.main(UserChannelTodb.scala:57)
   at logstatstreaming.UserChannelTodb.main(UserChannelTodb.scala)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
   at 

[jira] [Commented] (SPARK-6770) DirectKafkaInputDStream has not been initialized when recovery from checkpoint

2015-04-08 Thread Saisai Shao (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14486712#comment-14486712
 ] 

Saisai Shao commented on SPARK-6770:


I think you have to put your streaming related logic into the function 
{{functionToCreateContext}}, you could refer to the related Spark Streaming 
example {{RecoverableNetworkWordCount}} to change your code. 

I think it is not a bug, you'd better try again.

 DirectKafkaInputDStream has not been initialized when recovery from checkpoint
 --

 Key: SPARK-6770
 URL: https://issues.apache.org/jira/browse/SPARK-6770
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: yangping wu

 I am  read data from kafka using createDirectStream method and save the 
 received log to Mysql, the code snippets as follows
 {code}
 def functionToCreateContext(): StreamingContext = {
   val sparkConf = new SparkConf()
   val sc = new SparkContext(sparkConf)
   val ssc = new StreamingContext(sc, Seconds(10))
   ssc.checkpoint(/tmp/kafka/channel/offset) // set checkpoint directory
   ssc
 }
 val struct = StructType(StructField(log, StringType) ::Nil)
 // Get StreamingContext from checkpoint data or create a new one
 val ssc = StreamingContext.getOrCreate(/tmp/kafka/channel/offset, 
 functionToCreateContext)
 val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, 
 StringDecoder](ssc, kafkaParams, topics)
 val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
 SDB.foreachRDD(rdd = {
   val result = rdd.map(item = {
 println(item)
 val result = item._2 match {
   case e: String = Row.apply(e)
   case _ = Row.apply()
 }
 result
   })
   println(result.count())
   val df = sqlContext.createDataFrame(result, struct)
   df.insertIntoJDBC(url, test, overwrite = false)
 })
 ssc.start()
 ssc.awaitTermination()
 ssc.stop()
 {code}
 But when I  recovery the program from checkpoint, I encountered an exception:
 {code}
 Exception in thread main org.apache.spark.SparkException: 
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not 
 been initialized
   at 
 org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
   at 
 org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51)
   at 
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
   at 
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
   at scala.Option.orElse(Option.scala:257)
   at 
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
   at 
 org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
   at 
 org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
   at 
 org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
   at 
 org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89)
   at 
 org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67)
   at 
 org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512)
   at logstatstreaming.UserChannelTodb$.main(UserChannelTodb.scala:57)
   at logstatstreaming.UserChannelTodb.main(UserChannelTodb.scala)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
   at 
 

[jira] [Commented] (SPARK-6770) DirectKafkaInputDStream has not been initialized when recovery from checkpoint

2015-04-08 Thread yangping wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14486737#comment-14486737
 ] 

yangping wu commented on SPARK-6770:


Hi Saisai Shao,  Thank you for you reply. I've tried to put my streaming 
related logic into the function ttfunctionToCreateContext/tt, as follow:
{code}
def functionToCreateContext() = {
  val sparkConf = new SparkConf().setAppName(channelAnalyser)
  val sc = new SparkContext(sparkConf)
  val ssc = new StreamingContext(sc, Seconds(10))
  ssc.checkpoint(/tmp/kafka/test/offset)
  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  val test = Set(test)
  val struct = StructType(StructField(log, StringType) ::Nil)
  val kafkaParams = Map[String, String](metadata.broker.list - 
192.168.100.11:9092,192.168.100.12:9092,192.168.100.13:9092,
group.id - test-consumer-group111)
  val url = 
jdbc:mysql://192.168.100.10:3306/spark?user=adminpassword=123456useUnicode=truecharacterEncoding=utf8autoReconnect=true

  val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParams, test)

  SDB.foreachRDD(rdd = {
val result = rdd.map(item = {
item._2 match {
  case e: String = Row.apply(e)
  case _ = Row.apply()
}
})
try {
  println(result.count())
  val df = sqlContext.createDataFrame(result, struct)
  df.insertIntoJDBC(url, testTable, overwrite = false)
} catch {
  case e: Exception = e.printStackTrace()
}
  })

  ssc
}

val ssc = StreamingContext.getOrCreate(/tmp/kafka/test/offset, 
functionToCreateContext)
ssc.start()
{code}
But when I recovery the program from checkpoint, I encountered an exception:
{code}
java.lang.NullPointerException
at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:217)
at 
org.apache.spark.sql.SQLConf.dataFrameEagerAnalysis(SQLConf.scala:191)
at org.apache.spark.sql.DataFrame.init(DataFrame.scala:132)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:381)
at 
logstatstreaming.FlightSearchTodb$$anonfun$logstatstreaming$FlightSearchTodb$$functionToCreateContext$1$1.apply(FlightSearchTodb.scala:57)
at 
logstatstreaming.FlightSearchTodb$$anonfun$logstatstreaming$FlightSearchTodb$$functionToCreateContext$1$1.apply(FlightSearchTodb.scala:40)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)
{code}

It seems to be the SQLContext has not been initialized, so the ttsettingstt 
is not initialized in the ttorg.apache.spark.sql.SQLConf/tt. then
{code}
private[spark] def dataFrameEagerAnalysis: Boolean =
getConf(DATAFRAME_EAGER_ANALYSIS, true).toBoolean
{code}
throw java.lang.NullPointerException.


 DirectKafkaInputDStream has not been initialized when recovery from checkpoint
 --

 Key: SPARK-6770
 URL: https://issues.apache.org/jira/browse/SPARK-6770
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: yangping wu

 I am  read data from kafka using createDirectStream method and save the 
 received log to Mysql, the code snippets as follows
 {code}
 def functionToCreateContext(): StreamingContext = {
   val sparkConf = new SparkConf()
 

[jira] [Commented] (SPARK-6770) DirectKafkaInputDStream has not been initialized when recovery from checkpoint

2015-04-08 Thread yangping wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14486736#comment-14486736
 ] 

yangping wu commented on SPARK-6770:


Hi Saisai Shao,  Thank you for you reply. I've tried to put my streaming 
related logic into the function ttfunctionToCreateContext/tt, as follow:
{code}
def functionToCreateContext() = {
  val sparkConf = new SparkConf().setAppName(channelAnalyser)
  val sc = new SparkContext(sparkConf)
  val ssc = new StreamingContext(sc, Seconds(10))
  ssc.checkpoint(/tmp/kafka/test/offset)
  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  val test = Set(test)
  val struct = StructType(StructField(log, StringType) ::Nil)
  val kafkaParams = Map[String, String](metadata.broker.list - 
192.168.100.11:9092,192.168.100.12:9092,192.168.100.13:9092,
group.id - test-consumer-group111)
  val url = 
jdbc:mysql://192.168.100.10:3306/spark?user=adminpassword=123456useUnicode=truecharacterEncoding=utf8autoReconnect=true

  val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParams, test)

  SDB.foreachRDD(rdd = {
val result = rdd.map(item = {
item._2 match {
  case e: String = Row.apply(e)
  case _ = Row.apply()
}
})
try {
  println(result.count())
  val df = sqlContext.createDataFrame(result, struct)
  df.insertIntoJDBC(url, testTable, overwrite = false)
} catch {
  case e: Exception = e.printStackTrace()
}
  })

  ssc
}

val ssc = StreamingContext.getOrCreate(/tmp/kafka/test/offset, 
functionToCreateContext)
ssc.start()
{code}
But when I recovery the program from checkpoint, I encountered an exception:
{code}
java.lang.NullPointerException
at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:217)
at 
org.apache.spark.sql.SQLConf.dataFrameEagerAnalysis(SQLConf.scala:191)
at org.apache.spark.sql.DataFrame.init(DataFrame.scala:132)
at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)
at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:381)
at 
logstatstreaming.FlightSearchTodb$$anonfun$logstatstreaming$FlightSearchTodb$$functionToCreateContext$1$1.apply(FlightSearchTodb.scala:57)
at 
logstatstreaming.FlightSearchTodb$$anonfun$logstatstreaming$FlightSearchTodb$$functionToCreateContext$1$1.apply(FlightSearchTodb.scala:40)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at 
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at 
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
at java.lang.Thread.run(Thread.java:619)
{code}

It seems to be the SQLContext has not been initialized, so the ttsettingstt 
is not initialized in the ttorg.apache.spark.sql.SQLConf/tt. then
{code}
private[spark] def dataFrameEagerAnalysis: Boolean =
getConf(DATAFRAME_EAGER_ANALYSIS, true).toBoolean
{code}
throw java.lang.NullPointerException.


 DirectKafkaInputDStream has not been initialized when recovery from checkpoint
 --

 Key: SPARK-6770
 URL: https://issues.apache.org/jira/browse/SPARK-6770
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: yangping wu

 I am  read data from kafka using createDirectStream method and save the 
 received log to Mysql, the code snippets as follows
 {code}
 def functionToCreateContext(): StreamingContext = {
   val sparkConf = new SparkConf()
 

[jira] [Commented] (SPARK-6770) DirectKafkaInputDStream has not been initialized when recovery from checkpoint

2015-04-08 Thread yangping wu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14486760#comment-14486760
 ] 

yangping wu commented on SPARK-6770:


Yes, I think so. 
How to use other methods to solve it?

 DirectKafkaInputDStream has not been initialized when recovery from checkpoint
 --

 Key: SPARK-6770
 URL: https://issues.apache.org/jira/browse/SPARK-6770
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: yangping wu

 I am  read data from kafka using createDirectStream method and save the 
 received log to Mysql, the code snippets as follows
 {code}
 def functionToCreateContext(): StreamingContext = {
   val sparkConf = new SparkConf()
   val sc = new SparkContext(sparkConf)
   val ssc = new StreamingContext(sc, Seconds(10))
   ssc.checkpoint(/tmp/kafka/channel/offset) // set checkpoint directory
   ssc
 }
 val struct = StructType(StructField(log, StringType) ::Nil)
 // Get StreamingContext from checkpoint data or create a new one
 val ssc = StreamingContext.getOrCreate(/tmp/kafka/channel/offset, 
 functionToCreateContext)
 val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, 
 StringDecoder](ssc, kafkaParams, topics)
 val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext)
 SDB.foreachRDD(rdd = {
   val result = rdd.map(item = {
 println(item)
 val result = item._2 match {
   case e: String = Row.apply(e)
   case _ = Row.apply()
 }
 result
   })
   println(result.count())
   val df = sqlContext.createDataFrame(result, struct)
   df.insertIntoJDBC(url, test, overwrite = false)
 })
 ssc.start()
 ssc.awaitTermination()
 ssc.stop()
 {code}
 But when I  recovery the program from checkpoint, I encountered an exception:
 {code}
 Exception in thread main org.apache.spark.SparkException: 
 org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not 
 been initialized
   at 
 org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
   at 
 org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51)
   at 
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
   at 
 org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
   at scala.Option.orElse(Option.scala:257)
   at 
 org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
   at 
 org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
   at 
 org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
   at 
 org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
   at 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at 
 scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
   at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
   at 
 org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218)
   at 
 scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
   at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218)
   at 
 org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89)
   at 
 org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67)
   at 
 org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512)
   at logstatstreaming.UserChannelTodb$.main(UserChannelTodb.scala:57)
   at logstatstreaming.UserChannelTodb.main(UserChannelTodb.scala)
   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   at 
 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
   at java.lang.reflect.Method.invoke(Method.java:597)
   at