Can't say what is happening, and I have a similar problem here. While for you the source is:
org.apache.spark.streaming.dstream.WindowedDStream@532d0784 has not been initialized For me is: org.apache.spark.SparkException: org.apache.spark.streaming.dstream.MapPartitionedDStream@7a2d07cc has not been initialized Here, the problem started after I change my main class to use another class to execute the stream. Before: object TopStream { //everything here } After object TopStream { // call new TopStream.process( ... ) } class TopStream extends Serializable { } Tiago Albineli Motta Desenvolvedor de Software - Globo.com ICQ: 32107100 http://programandosemcafeina.blogspot.com On Wed, Jul 29, 2015 at 12:59 PM, Sadaf <sa...@platalytics.com> wrote: > Hi > > I am new to Spark Streaming and writing a code for twitter connector. when > i > run this code more than one time, it gives the following exception. I have > to create a new hdfs directory for checkpointing each time to make it run > successfully and moreover it doesn't get stopped. > > ERROR StreamingContext: Error starting the context, marking it as stopped > org.apache.spark.SparkException: > org.apache.spark.streaming.dstream.WindowedDStream@532d0784 has not been > initialized > at > org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:321) > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342) > at scala.Option.orElse(Option.scala:257) > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339) > at > > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) > at > > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) > at > > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120) > at > > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > at > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > at > > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120) > at > > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227) > at > > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222) > at > > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > at > > org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222) > at > > org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92) > at > > org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73) > at > > org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588) > at > > org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586) > at twitter.streamingSpark$.twitterConnector(App.scala:38) > at twitter.streamingSpark$.main(App.scala:26) > at twitter.streamingSpark.main(App.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > > org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) > at > org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) > at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) > at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) > at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) > > The relavent code is > > def twitterConnector() :Unit = > { > val atwitter=managingCredentials() > > val ssc=StreamingContext.getOrCreate("hdfsDirectory",()=> { > managingContext() }) > fetchTweets(ssc, atwitter ) > > ssc.start() // Start the computation > ssc.awaitTermination() > > } > > def managingContext():StreamingContext = > { > //making spark context > val conf = new > SparkConf().setMaster("local[*]").setAppName("twitterConnector") > val ssc = new StreamingContext(conf, Seconds(1)) > val sqlContext = new org.apache.spark.sql.SQLContext(ssc.sparkContext) > import sqlContext.implicits._ > > //checkpointing > ssc.checkpoint("hdfsDirectory") > ssc > } > def fetchTweets (ssc : StreamingContext , atwitter : > Option[twitter4j.auth.Authorization]) : Unit = { > > > val tweets > =TwitterUtils.createStream(ssc,atwitter,Nil,StorageLevel.MEMORY_AND_DISK_2) > val twt = tweets.window(Seconds(10),Seconds(10)) > //checkpoint duration > /twt.checkpoint(new Duration(1000)) > > //processing > case class Tweet(createdAt:Long, text:String) > twt.map(status=> > Tweet(status.getCreatedAt().getTime()/1000, status.getText()) > ) > twt.print() > } > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Error-in-starting-Spark-Streaming-Context-tp24063.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >