I bring up spark streaming job that uses Kafka as input source.
No data to process and then shut it down. And bring it back again.
This time job does not start because it complains that DStream is not
initialized.
15/06/26 01:10:44 ERROR yarn.ApplicationMaster: User class threw exception:
org.apache.spark.streaming.dstream.UnionDStream@6135e5d8 has not been
initialized
org.apache.spark.SparkException:
org.apache.spark.streaming.dstream.UnionDStream@6135e5d8 has not been
initialized
at
org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:266)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287)
at scala.Option.orElse(Option.scala:257)
at
org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:284)
at
org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:116)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
at
scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
at
org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:116)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
at
org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at
org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
at
org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:90)
at
org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:67)
at
org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:512)
..........
I am using spark 1.3.1 and spark-streaming-kafka 1.3.1 versions.
Any idea how to resolve this issue?
Thanks
Ashish