Hi all, I apologise for re-posting this, I realise some mail systems are filtering all the code samples from the original post.
I would greatly appreciate any pointer regarding, this issue basically renders spark streaming not fault-tolerant for us. Thanks in advance, S --- " I experience spark streaming restart issues similar to what is discussed in the 2 threads below (in which I failed to find a solution). Could anybody let me know if anything is wrong in the way I start/stop or if this could be a spark bug? http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-td14847.html http://apache-spark-user-list.1001560.n3.nabble.com/KafkaReciever-Error-when-starting-ssc-Actor-name-not-unique-tc3978.html My stream reads a Kafka topic, does some processing involving an updatStateByKey and saves the result to HDFS. The context is (re)-created at startup as follows: def streamContext() = { def newContext() = { val ctx = new StreamingContext(sparkConf, Duration(10000)) ctx.checkpoint("hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/") ctx } StreamingContext.getOrCreate("hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/", newContext) } And the start-up and shutdown of the stream is handled as follows: try { val sparkContext = streamContext() [.. build stream here...] sparkContext.start() sparkContext.awaitTermination() } catch { case e: Throwable => log.error("shutting down tabulation stream...", e) sparkContext.stop() log.info("...waiting termination...") sparkContext.awaitTermination() log.info("...tabulation stream stopped") } When starting the stream for the first time (with spark-submit), the processing happens successfully, folders are created on the target HDFS folder and streaming stats are visible on http://sparkhost:4040/streaming. After letting the streaming work several minutes and then stopping it (ctrl-c on the command line), the following info is visible in the checkpoint folder: mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/ 14/09/25 09:39:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 11 items drwxr-xr-x - mnubohadoop hadoop 0 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8 -rw-r--r-- 3 mnubohadoop hadoop 5479 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652290000 -rw-r--r-- 3 mnubohadoop hadoop 5512 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652290000.bk -rw-r--r-- 3 mnubohadoop hadoop 5479 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652300000 -rw-r--r-- 3 mnubohadoop hadoop 5507 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652300000.bk -rw-r--r-- 3 mnubohadoop hadoop 5476 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652310000 -rw-r--r-- 3 mnubohadoop hadoop 5504 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652310000.bk -rw-r--r-- 3 mnubohadoop hadoop 5477 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652320000 -rw-r--r-- 3 mnubohadoop hadoop 5506 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652320000.bk -rw-r--r-- 3 mnubohadoop hadoop 5484 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000 -rw-r--r-- 3 mnubohadoop hadoop 5504 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000.bk mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8 14/09/25 09:42:08 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 2 items drwxr-xr-x - mnubohadoop hadoop 0 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8/rdd-8438 drwxr-xr-x - mnubohadoop hadoop 0 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8/rdd-8542 (checkpoint clean-up seems to happen since the stream ran for much more than 5 times 10 seconds) When re-starting the stream, the startup fails with the error below, http://sparkhost:4040/streaming shows no statistics, no new HDFS folder is added in the target folder and no new checkpoint are created: 09:45:05.038 [main] ERROR c.mnubo.analytic.tabulate.StreamApp - shutting down tabulation stream... org.apache.spark.SparkException: org.apache.spark.streaming.dstream.FilteredDStream@e8949a1 has not been initialized at org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:263) ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0] at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:290) ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0] at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38) ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0] at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0] at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0] at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1] at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1] at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1] at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1] at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1] at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1] at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:115) ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0] at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:210) ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0] at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:209) ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0] at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1] at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1] at org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:209) ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0] at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:80) ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0] at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:66) ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0] at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:444) ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0] mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/ 14/09/25 09:48:39 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 12 items drwxr-xr-x - mnubohadoop hadoop 0 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8 drwxr-xr-x - mnubohadoop hadoop 0 2014-09-25 09:43 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/a9dded7b-a288-44da-89d0-0309a73fab3a -rw-r--r-- 3 mnubohadoop hadoop 5479 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652290000 -rw-r--r-- 3 mnubohadoop hadoop 5512 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652290000.bk -rw-r--r-- 3 mnubohadoop hadoop 5479 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652300000 -rw-r--r-- 3 mnubohadoop hadoop 5507 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652300000.bk -rw-r--r-- 3 mnubohadoop hadoop 5476 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652310000 -rw-r--r-- 3 mnubohadoop hadoop 5504 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652310000.bk -rw-r--r-- 3 mnubohadoop hadoop 5477 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652320000 -rw-r--r-- 3 mnubohadoop hadoop 5506 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652320000.bk -rw-r--r-- 3 mnubohadoop hadoop 5484 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000 -rw-r--r-- 3 mnubohadoop hadoop 5504 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000.bk Now if I delete all older checkpoints and keep only the most recent one: mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/ 14/09/25 10:06:08 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 3 items drwxr-xr-x - mnubohadoop hadoop 0 2014-09-25 09:43 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/a9dded7b-a288-44da-89d0-0309a73fab3a -rw-r--r-- 3 mnubohadoop hadoop 5484 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000 -rw-r--r-- 3 mnubohadoop hadoop 5504 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000.bk I end up with this (kafka?) actor non unique name error. 10:07:25.088 [Result resolver thread-0] WARN o.a.spark.scheduler.TaskSetManager - Lost task 1.0 in stage 3.0 (TID 73, vm21-hulk-priv.mtl.mnubo.com): akka.actor.InvalidActorNameException: actor name [Receiver-0-1411654045063] is not unique! akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130) akka.actor.dungeon.Children$class.reserveChild(Children.scala:77) akka.actor.ActorCell.reserveChild(ActorCell.scala:338) akka.actor.dungeon.Children$class.makeChild(Children.scala:186) akka.actor.dungeon.Children$class.attachChild(Children.scala:42) akka.actor.ActorCell.attachChild(ActorCell.scala:338) akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:518) org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.<init>(ReceiverSupervisorImpl.scala:67) org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:263) org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) If I delete the checkpoint folder the stream starts successfully (but I lose my ongoing stream state, obviously) We're running spark 1.1.0 on Mesos 0.20. Our spark assembly is packaged with CDH 5.1.0 and Hive: sbt/sbt clean assembly/assembly -Dhadoop.version=2.3.0-mr1-cdh5.1.0 -Phive ./make-distribution.sh --tgz --skip-java-test -Dhadoop.version=2.3.0-mr1-cdh5.1.0 -Phive Any comment or suggestion would be greatly appreciated. " On Thu, Sep 25, 2014 at 4:20 PM, Svend <svend.vanderve...@gmail.com> wrote: > I experience spark streaming restart issues similar to what is discussed in > the 2 threads below (in which I failed to find a solution). Could anybody > let me know if anything is wrong in the way I start/stop or if this could > be > a spark bug? > > > http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-td14847.html > > http://apache-spark-user-list.1001560.n3.nabble.com/KafkaReciever-Error-when-starting-ssc-Actor-name-not-unique-tc3978.html > > My stream reads a Kafka topic, does some processing involving an > updatStateByKey and saves the result to HDFS. > > The context is (re)-created at startup as follows: > > > > And the start-up and shutdown of the stream is handled as follows: > > > > > When starting the stream for the first time (with spark-submit), the > processing happens successfully, folders are created on the target HDFS > folder and streaming stats are visible on http://sparkhost:4040/streaming. > > After letting the streaming work several minutes and then stopping it > (ctrl-c on the command line), the following info is visible in the > checkpoint folder: > > > > (checkpoint clean-up seems to happen since the stream ran for much more > than > 5 times 10 seconds) > > When re-starting the stream, the startup fails with the error below, > http://sparkhost:4040/streaming shows no statistics, no new HDFS folder is > added in the target folder and no new checkpoint are created: > > > > > > > Now if I delete all older checkpoints and keep only the most recent one: > > > > I end up with this (kafka?) actor non unique name error. > > > > If I delete the checkpoint folder the stream starts successfully (but I > lose > my ongoing stream state, obviously) > > We're running spark 1.1.0 on Mesos 0.20. Our spark assembly is packaged > with > CDH 5.1.0 and Hive: > > > > Any comment or suggestion would be greatly appreciated. > > > > > > > > > > > > > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Systematic-error-when-re-starting-Spark-stream-unless-I-delete-all-checkpoints-tp15142.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >