Solved!
The problem has nothing to do about class and object refactory. But in the
process of this refactory I made a change that is similar of your code.
Before this refactory, I processed the DStream inside the function that I
sent to StreamingContext.getOrCreate. After, I started processing the
DStream using the returned from StreamingContext.getOrCreate returned.
So you should call *fetchTweets *inside *managingContext*.
That worked for me.
Tiago
Tiago Albineli Motta
Desenvolvedor de Software - Globo.com
ICQ: 32107100
http://programandosemcafeina.blogspot.com
On Thu, Oct 22, 2015 at 11:22 AM, Tiago Albineli Motta
wrote:
> Can't say what is happening, and I have a similar problem here.
>
> While for you the source is:
>
> org.apache.spark.streaming.dstream.WindowedDStream@532d0784 has not been
> initialized
>
>
> For me is:
>
> org.apache.spark.SparkException:
> org.apache.spark.streaming.dstream.MapPartitionedDStream@7a2d07cc has not
> been initialized
>
>
> Here, the problem started after I change my main class to use another class
> to execute the stream.
>
>
> Before:
>
>
> object TopStream {
>
> //everything here
>
> }
>
>
> After
>
>
> object TopStream {
>
>// call new TopStream.process( ... )
>
> }
>
>
> class TopStream extends Serializable {
>
> }
>
>
>
>
>
> Tiago Albineli Motta
> Desenvolvedor de Software - Globo.com
> ICQ: 32107100
> http://programandosemcafeina.blogspot.com
>
> On Wed, Jul 29, 2015 at 12:59 PM, Sadaf wrote:
>
>> Hi
>>
>> I am new to Spark Streaming and writing a code for twitter connector.
>> when i
>> run this code more than one time, it gives the following exception. I have
>> to create a new hdfs directory for checkpointing each time to make it run
>> successfully and moreover it doesn't get stopped.
>>
>> ERROR StreamingContext: Error starting the context, marking it as stopped
>> org.apache.spark.SparkException:
>> org.apache.spark.streaming.dstream.WindowedDStream@532d0784 has not been
>> initialized
>> at
>> org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:321)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>> at scala.Option.orElse(Option.scala:257)
>> at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>> at
>>
>> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>> at
>>
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>> at
>>
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>> at
>>
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>> at
>>
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>> at
>>
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>> at
>>
>> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
>> at
>>
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
>> at
>>
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
>> at
>>
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> at
>>
>> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
>> at
>>
>> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
>> at
>>
>> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
>> at
>>
>> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588)
>> at
>>
>> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
>> at twitter.streamingSpark$.twitterConnector(App.scala:38)
>> at twitter.streamingSpark$.main(App.scala:26)
>> at twitter.streamingSpark.main(App.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>>
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
>> at
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>>