[ https://issues.apache.org/jira/browse/SPARK-6770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14486737#comment-14486737 ]
yangping wu commented on SPARK-6770: ------------------------------------ Hi Saisai Shao, Thank you for you reply. I've tried to put my streaming related logic into the function <tt>functionToCreateContext</tt>, as follow: {code} def functionToCreateContext() = { val sparkConf = new SparkConf().setAppName("channelAnalyser") val sc = new SparkContext(sparkConf) val ssc = new StreamingContext(sc, Seconds(10)) ssc.checkpoint("/tmp/kafka/test/offset") val sqlContext = new org.apache.spark.sql.SQLContext(sc) val test = Set("test") val struct = StructType(StructField("log", StringType) ::Nil) val kafkaParams = Map[String, String]("metadata.broker.list" -> "192.168.100.11:9092,192.168.100.12:9092,192.168.100.13:9092", "group.id" -> "test-consumer-group111") val url = "jdbc:mysql://192.168.100.10:3306/spark?user=admin&password=123456&useUnicode=true&characterEncoding=utf8&autoReconnect=true" val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, test) SDB.foreachRDD(rdd => { val result = rdd.map(item => { item._2 match { case e: String => Row.apply(e) case _ => Row.apply("") } }) try { println(result.count()) val df = sqlContext.createDataFrame(result, struct) df.insertIntoJDBC(url, "testTable", overwrite = false) } catch { case e: Exception => e.printStackTrace() } }) ssc } val ssc = StreamingContext.getOrCreate("/tmp/kafka/test/offset", functionToCreateContext) ssc.start() {code} But when I recovery the program from checkpoint, I encountered an exception: {code} java.lang.NullPointerException at org.apache.spark.sql.SQLConf.getConf(SQLConf.scala:217) at org.apache.spark.sql.SQLConf.dataFrameEagerAnalysis(SQLConf.scala:191) at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:132) at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51) at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:381) at logstatstreaming.FlightSearchTodb$$anonfun$logstatstreaming$FlightSearchTodb$$functionToCreateContext$1$1.apply(FlightSearchTodb.scala:57) at logstatstreaming.FlightSearchTodb$$anonfun$logstatstreaming$FlightSearchTodb$$functionToCreateContext$1$1.apply(FlightSearchTodb.scala:40) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1.apply(DStream.scala:534) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:42) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:176) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:176) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:175) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:619) {code} It seems to be the SQLContext has not been initialized, so the <tt>settings<tt> is not initialized in the <tt>org.apache.spark.sql.SQLConf</tt>. then {code} private[spark] def dataFrameEagerAnalysis: Boolean = getConf(DATAFRAME_EAGER_ANALYSIS, "true").toBoolean {code} throw java.lang.NullPointerException. > DirectKafkaInputDStream has not been initialized when recovery from checkpoint > ------------------------------------------------------------------------------ > > Key: SPARK-6770 > URL: https://issues.apache.org/jira/browse/SPARK-6770 > Project: Spark > Issue Type: Bug > Components: Streaming > Affects Versions: 1.3.0 > Reporter: yangping wu > > I am read data from kafka using createDirectStream method and save the > received log to Mysql, the code snippets as follows > {code} > def functionToCreateContext(): StreamingContext = { > val sparkConf = new SparkConf() > val sc = new SparkContext(sparkConf) > val ssc = new StreamingContext(sc, Seconds(10)) > ssc.checkpoint("/tmp/kafka/channel/offset") // set checkpoint directory > ssc > } > val struct = StructType(StructField("log", StringType) ::Nil) > // Get StreamingContext from checkpoint data or create a new one > val ssc = StreamingContext.getOrCreate("/tmp/kafka/channel/offset", > functionToCreateContext) > val SDB = KafkaUtils.createDirectStream[String, String, StringDecoder, > StringDecoder](ssc, kafkaParams, topics) > val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) > SDB.foreachRDD(rdd => { > val result = rdd.map(item => { > println(item) > val result = item._2 match { > case e: String => Row.apply(e) > case _ => Row.apply("") > } > result > }) > println(result.count()) > val df = sqlContext.createDataFrame(result, struct) > df.insertIntoJDBC(url, "test", overwrite = false) > }) > ssc.start() > ssc.awaitTermination() > ssc.stop() > {code} > But when I recovery the program from checkpoint, I encountered an exception: > {code} > Exception in thread "main" org.apache.spark.SparkException: > org.apache.spark.streaming.kafka.DirectKafkaInputDStream@41a80e5a has not > been initialized > at > org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266) > at > org.apache.spark.streaming.dstream.InputDStream.isTimeValid(InputDStream.scala:51) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) > at scala.Option.orElse(Option.scala:257) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284) > at > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:223) > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:218) > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > at > org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:218) > at > org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:89) > at > org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67) > at > org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512) > at logstatstreaming.UserChannelTodb$.main(UserChannelTodb.scala:57) > at logstatstreaming.UserChannelTodb.main(UserChannelTodb.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) > at java.lang.reflect.Method.invoke(Method.java:597) > at > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) > at > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > {code} > Not sure if this is a bug or a feature, but it's not obvious, so wanted to > create a JIRA to make sure we document this behavior.Is someone can help me > to see the reasons? Thank you. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org