A side question: Any reason why you're using window(Seconds(10), Seconds(10))
instead of new StreamingContext(conf, Seconds(10)) ?
Making the micro-batch interval 10 seconds instead of 1 will provide you
the same 10-second window with less complexity.  Of course, this might just
be a test for the window functionality.

-kr, Gerard.

On Wed, Jul 29, 2015 at 10:54 AM, Sadaf <sa...@platalytics.com> wrote:

> Hi,
>
> I am new to Spark Streaming and writing a code for twitter connector.
> I am facing the following exception.
>
> ERROR StreamingContext: Error starting the context, marking it as stopped
> org.apache.spark.SparkException:
> org.apache.spark.streaming.dstream.WindowedDStream@532d0784 has not been
> initialized
>         at
> org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:321)
>         at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>         at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>         at scala.Option.orElse(Option.scala:257)
>         at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>         at
>
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>         at
>
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>         at
>
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>         at
>
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>         at
>
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>         at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>         at
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>         at
>
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
>         at
>
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
>         at
>
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
>         at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>         at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>         at
>
> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
>         at
>
> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
>         at
>
> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
>         at
>
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588)
>         at
>
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
>         at twitter.streamingSpark$.twitterConnector(App.scala:38)
>         at twitter.streamingSpark$.main(App.scala:26)
>         at twitter.streamingSpark.main(App.scala)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:606)
>         at
>
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
>         at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>         at
> org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
> the relavent code is
>
>  def twitterConnector() :Unit =
>   {
>          val atwitter=managingCredentials()
>
>        val
> ssc=StreamingContext.getOrCreate("hdfs://
> 192.168.23.109:9000/home/cloud9/twitterCheckpointDir",()=>
> { managingContext() })
>        fetchTweets(ssc, atwitter )
>
>        ssc.start()             // Start the computation
>        ssc.awaitTermination()
>
>   }
>
> def managingContext():StreamingContext =
>   {
>        //making spark context
>        val conf = new
> SparkConf().setMaster("local[*]").setAppName("twitterConnector")
>        val ssc = new StreamingContext(conf, Seconds(1))
>        val sqlContext = new
> org.apache.spark.sql.SQLContext(ssc.sparkContext)
>        import sqlContext.implicits._
>
>        //checkpointing
>
> ssc.checkpoint("hdfs://
> 192.168.23.109:9000/home/cloud9/twitterCheckpointDir")
>        ssc
>   }
>    def fetchTweets (ssc : StreamingContext , atwitter :
> Option[twitter4j.auth.Authorization]) : Unit = {
>
>
>        val tweets
> =TwitterUtils.createStream(ssc,atwitter,Nil,StorageLevel.MEMORY_AND_DISK_2)
>        val twt = tweets.window(Seconds(10),Seconds(10))
>       //checkpoint duration
>       /twt.checkpoint(new Duration(1000))
>
>        //processing
>        case class Tweet(createdAt:Long, text:String)
>        twt.map(status=>
>        Tweet(status.getCreatedAt().getTime()/1000, status.getText())
>        )
>        twt.print()
>   }
> Can anyone help me in this regards?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-tp24058.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

Reply via email to