Hi again,
Just FYI, I found the mistake in my code regarding restartability of spark streaming: I had a method providing a context (either retrieved from checkpoint or, if no checkpoint available, built anew) and was building then starting a stream on it. The mistake is that we should not build a stream out of a context retrieved from checkpoint since it already contains all necessary elements, as also illustrated in the RecoverableNetworkWordCount (*https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala <https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala>)* S On Fri, Sep 26, 2014 at 3:09 PM, Svend Vanderveken < svend.vanderve...@gmail.com> wrote: > Hi all, > > I apologise for re-posting this, I realise some mail systems are filtering > all the code samples from the original post. > > I would greatly appreciate any pointer regarding, this issue basically > renders spark streaming not fault-tolerant for us. > > Thanks in advance, > > S > > > > --- > > " > I experience spark streaming restart issues similar to what is discussed > in the 2 threads below (in which I failed to find a solution). Could > anybody let me know if anything is wrong in the way I start/stop or if this > could be a spark bug? > > > http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-td14847.html > > http://apache-spark-user-list.1001560.n3.nabble.com/KafkaReciever-Error-when-starting-ssc-Actor-name-not-unique-tc3978.html > > My stream reads a Kafka topic, does some processing involving an > updatStateByKey and saves the result to HDFS. > > The context is (re)-created at startup as follows: > > def streamContext() = { > > def newContext() = { > val ctx = new StreamingContext(sparkConf, Duration(10000)) > ctx.checkpoint("hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/") > ctx > } > > > StreamingContext.getOrCreate("hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/", > newContext) > } > > > And the start-up and shutdown of the stream is handled as follows: > > try { > > val sparkContext = streamContext() > > [.. build stream here...] > > sparkContext.start() > sparkContext.awaitTermination() > > } catch { > case e: Throwable => > log.error("shutting down tabulation stream...", e) > sparkContext.stop() > log.info("...waiting termination...") > sparkContext.awaitTermination() > log.info("...tabulation stream stopped") > } > > > > When starting the stream for the first time (with spark-submit), the > processing happens successfully, folders are created on the target HDFS > folder and streaming stats are visible on http://sparkhost:4040/streaming > . > > After letting the streaming work several minutes and then stopping it > (ctrl-c on the command line), the following info is visible in the > checkpoint folder: > > mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/ > 14/09/25 09:39:13 WARN util.NativeCodeLoader: Unable to load native-hadoop > library for your platform... using builtin-java classes where applicable > Found 11 items > drwxr-xr-x - mnubohadoop hadoop 0 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8 > -rw-r--r-- 3 mnubohadoop hadoop 5479 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652290000 > -rw-r--r-- 3 mnubohadoop hadoop 5512 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652290000.bk > -rw-r--r-- 3 mnubohadoop hadoop 5479 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652300000 > -rw-r--r-- 3 mnubohadoop hadoop 5507 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652300000.bk > -rw-r--r-- 3 mnubohadoop hadoop 5476 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652310000 > -rw-r--r-- 3 mnubohadoop hadoop 5504 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652310000.bk > -rw-r--r-- 3 mnubohadoop hadoop 5477 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652320000 > -rw-r--r-- 3 mnubohadoop hadoop 5506 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652320000.bk > -rw-r--r-- 3 mnubohadoop hadoop 5484 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000 > -rw-r--r-- 3 mnubohadoop hadoop 5504 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000.bk > mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8 > 14/09/25 09:42:08 WARN util.NativeCodeLoader: Unable to load native-hadoop > library for your platform... using builtin-java classes where applicable > Found 2 items > drwxr-xr-x - mnubohadoop hadoop 0 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8/rdd-8438 > drwxr-xr-x - mnubohadoop hadoop 0 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8/rdd-8542 > > > (checkpoint clean-up seems to happen since the stream ran for much more > than 5 times 10 seconds) > > When re-starting the stream, the startup fails with the error below, > http://sparkhost:4040/streaming shows no statistics, no new HDFS folder > is added in the target folder and no new checkpoint are created: > > 09:45:05.038 [main] ERROR c.mnubo.analytic.tabulate.StreamApp - shutting down > tabulation stream... > org.apache.spark.SparkException: > org.apache.spark.streaming.dstream.FilteredDStream@e8949a1 has not been > initialized > at > org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:263) > ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0] > at > org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:290) > ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0] > at > org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) > ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0] > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) > ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0] > at > org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) > ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0] > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1] > at > scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) > ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1] > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1] > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1] > at > scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) > ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1] > at > scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) > ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1] > at > org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:115) > ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0] > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:210) > ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0] > at > org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:209) > ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0] > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1] > at > scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1] > at > org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:209) > ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0] > at > org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:80) > ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0] > at > org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:66) > ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0] > at > org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:444) > ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0] > > > > mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/ > 14/09/25 09:48:39 WARN util.NativeCodeLoader: Unable to load native-hadoop > library for your platform... using builtin-java classes where applicable > Found 12 items > drwxr-xr-x - mnubohadoop hadoop 0 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8 > drwxr-xr-x - mnubohadoop hadoop 0 2014-09-25 09:43 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/a9dded7b-a288-44da-89d0-0309a73fab3a > -rw-r--r-- 3 mnubohadoop hadoop 5479 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652290000 > -rw-r--r-- 3 mnubohadoop hadoop 5512 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652290000.bk > -rw-r--r-- 3 mnubohadoop hadoop 5479 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652300000 > -rw-r--r-- 3 mnubohadoop hadoop 5507 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652300000.bk > -rw-r--r-- 3 mnubohadoop hadoop 5476 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652310000 > -rw-r--r-- 3 mnubohadoop hadoop 5504 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652310000.bk > -rw-r--r-- 3 mnubohadoop hadoop 5477 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652320000 > -rw-r--r-- 3 mnubohadoop hadoop 5506 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652320000.bk > -rw-r--r-- 3 mnubohadoop hadoop 5484 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000 > -rw-r--r-- 3 mnubohadoop hadoop 5504 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000.bk > > > Now if I delete all older checkpoints and keep only the most recent one: > > mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/ > 14/09/25 10:06:08 WARN util.NativeCodeLoader: Unable to load native-hadoop > library for your platform... using builtin-java classes where applicable > Found 3 items > drwxr-xr-x - mnubohadoop hadoop 0 2014-09-25 09:43 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/a9dded7b-a288-44da-89d0-0309a73fab3a > -rw-r--r-- 3 mnubohadoop hadoop 5484 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000 > -rw-r--r-- 3 mnubohadoop hadoop 5504 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000.bk > > > I end up with this (kafka?) actor non unique name error. > > 10:07:25.088 [Result resolver thread-0] WARN > o.a.spark.scheduler.TaskSetManager - Lost task 1.0 in stage 3.0 (TID 73, > vm21-hulk-priv.mtl.mnubo.com): akka.actor.InvalidActorNameException: actor > name [Receiver-0-1411654045063] is not unique! > > akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130) > akka.actor.dungeon.Children$class.reserveChild(Children.scala:77) > akka.actor.ActorCell.reserveChild(ActorCell.scala:338) > akka.actor.dungeon.Children$class.makeChild(Children.scala:186) > akka.actor.dungeon.Children$class.attachChild(Children.scala:42) > akka.actor.ActorCell.attachChild(ActorCell.scala:338) > akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:518) > > org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.<init>(ReceiverSupervisorImpl.scala:67) > > org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:263) > > org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) > > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) > > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) > org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) > org.apache.spark.scheduler.Task.run(Task.scala:54) > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > > If I delete the checkpoint folder the stream starts successfully (but I > lose my ongoing stream state, obviously) > > We're running spark 1.1.0 on Mesos 0.20. Our spark assembly is packaged > with CDH 5.1.0 and Hive: > > sbt/sbt clean assembly/assembly -Dhadoop.version=2.3.0-mr1-cdh5.1.0 -Phive > ./make-distribution.sh --tgz --skip-java-test > -Dhadoop.version=2.3.0-mr1-cdh5.1.0 -Phive > > > Any comment or suggestion would be greatly appreciated. > " > > On Thu, Sep 25, 2014 at 4:20 PM, Svend <svend.vanderve...@gmail.com> > wrote: > >> I experience spark streaming restart issues similar to what is discussed >> in >> the 2 threads below (in which I failed to find a solution). Could anybody >> let me know if anything is wrong in the way I start/stop or if this could >> be >> a spark bug? >> >> >> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-td14847.html >> >> http://apache-spark-user-list.1001560.n3.nabble.com/KafkaReciever-Error-when-starting-ssc-Actor-name-not-unique-tc3978.html >> >> My stream reads a Kafka topic, does some processing involving an >> updatStateByKey and saves the result to HDFS. >> >> The context is (re)-created at startup as follows: >> >> >> >> And the start-up and shutdown of the stream is handled as follows: >> >> >> >> >> When starting the stream for the first time (with spark-submit), the >> processing happens successfully, folders are created on the target HDFS >> folder and streaming stats are visible on http://sparkhost:4040/streaming >> . >> >> After letting the streaming work several minutes and then stopping it >> (ctrl-c on the command line), the following info is visible in the >> checkpoint folder: >> >> >> >> (checkpoint clean-up seems to happen since the stream ran for much more >> than >> 5 times 10 seconds) >> >> When re-starting the stream, the startup fails with the error below, >> http://sparkhost:4040/streaming shows no statistics, no new HDFS folder >> is >> added in the target folder and no new checkpoint are created: >> >> >> >> >> >> >> Now if I delete all older checkpoints and keep only the most recent one: >> >> >> >> I end up with this (kafka?) actor non unique name error. >> >> >> >> If I delete the checkpoint folder the stream starts successfully (but I >> lose >> my ongoing stream state, obviously) >> >> We're running spark 1.1.0 on Mesos 0.20. Our spark assembly is packaged >> with >> CDH 5.1.0 and Hive: >> >> >> >> Any comment or suggestion would be greatly appreciated. >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Systematic-error-when-re-starting-Spark-stream-unless-I-delete-all-checkpoints-tp15142.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> --------------------------------------------------------------------- >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >