Hi, I am new to Spark Streaming and writing a code for twitter connector. I am facing the following exception.
ERROR StreamingContext: Error starting the context, marking it as stopped org.apache.spark.SparkException: org.apache.spark.streaming.dstream.WindowedDStream@532d0784 has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:321) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) at scala.Option.orElse(Option.scala:257) at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227) at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222) at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92) at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73) at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586) at twitter.streamingSpark$.twitterConnector(App.scala:38) at twitter.streamingSpark$.main(App.scala:26) at twitter.streamingSpark.main(App.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) the relavent code is def twitterConnector() :Unit = { val atwitter=managingCredentials() val ssc=StreamingContext.getOrCreate("hdfs://192.168.23.109:9000/home/cloud9/twitterCheckpointDir",()=> { managingContext() }) fetchTweets(ssc, atwitter ) ssc.start() // Start the computation ssc.awaitTermination() } def managingContext():StreamingContext = { //making spark context val conf = new SparkConf().setMaster("local[*]").setAppName("twitterConnector") val ssc = new StreamingContext(conf, Seconds(1)) val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) import sqlContext.implicits._ //checkpointing ssc.checkpoint("hdfs://192.168.23.109:9000/home/cloud9/twitterCheckpointDir") ssc } def fetchTweets (ssc : StreamingContext , atwitter : Option[twitter4j.auth.Authorization]) : Unit = { val tweets =TwitterUtils.createStream(ssc,atwitter,Nil,StorageLevel.MEMORY_AND_DISK_2) val twt = tweets.window(Seconds(10),Seconds(10)) //checkpoint duration /twt.checkpoint(new Duration(1000)) //processing case class Tweet(createdAt:Long, text:String) twt.map(status=> Tweet(status.getCreatedAt().getTime()/1000, status.getText()) ) twt.print() } Can anyone help me in this regards? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-tp24058.html Sent from the Apache Spark User List mailing list archive at Nabble.com. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org