Hi again,

Just FYI, I found the mistake in my code regarding restartability of spark
streaming: I had a method providing a context (either retrieved from
checkpoint or, if no checkpoint available, built anew) and was building
then starting a stream on it.

The mistake is that we should not build a stream out of a context retrieved
from checkpoint since it already contains all necessary elements, as also
illustrated in the RecoverableNetworkWordCount
(*https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
<https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala>)*

S










On Fri, Sep 26, 2014 at 3:09 PM, Svend Vanderveken <
svend.vanderve...@gmail.com> wrote:

> Hi all,
>
> I apologise for re-posting this, I realise some mail systems are filtering
> all the code samples from the original post.
>
> I would greatly appreciate any pointer regarding, this issue basically
> renders spark streaming not fault-tolerant for us.
>
> Thanks in advance,
>
> S
>
>
>
> ---
>
> "
> I experience spark streaming restart issues similar to what is discussed
> in the 2 threads below (in which I failed to find a solution). Could
> anybody let me know if anything is wrong in the way I start/stop or if this
> could be a spark bug?
>
>
> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-td14847.html
>
> http://apache-spark-user-list.1001560.n3.nabble.com/KafkaReciever-Error-when-starting-ssc-Actor-name-not-unique-tc3978.html
>
> My stream reads a Kafka topic, does some processing involving an
> updatStateByKey and saves the result to HDFS.
>
> The context is (re)-created at startup as follows:
>
> def streamContext() = {
>
>     def newContext() = {
>       val ctx = new StreamingContext(sparkConf, Duration(10000))
>       ctx.checkpoint("hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/")
>       ctx
>     }
>
>     
> StreamingContext.getOrCreate("hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/",
>  newContext)
>   }
>
>
> And the start-up and shutdown of the stream is handled as follows:
>
> try {
>
>     val sparkContext = streamContext()
>
>     [.. build stream here...]
>
>     sparkContext.start()
>     sparkContext.awaitTermination()
>
>   } catch {
>       case e: Throwable =>
>         log.error("shutting down tabulation stream...", e)
>         sparkContext.stop()
>         log.info("...waiting termination...")
>         sparkContext.awaitTermination()
>         log.info("...tabulation stream stopped")
>   }
>
>
>
> When starting the stream for the first time (with spark-submit), the
> processing happens successfully, folders are created on the target HDFS
> folder and streaming stats are visible on http://sparkhost:4040/streaming
> .
>
> After letting the streaming work several minutes and then stopping it
> (ctrl-c on the command line), the following info is visible in the
> checkpoint folder:
>
> mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/
> 14/09/25 09:39:13 WARN util.NativeCodeLoader: Unable to load native-hadoop 
> library for your platform... using builtin-java classes where applicable
> Found 11 items
> drwxr-xr-x   - mnubohadoop hadoop          0 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8
> -rw-r--r--   3 mnubohadoop hadoop       5479 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652290000
> -rw-r--r--   3 mnubohadoop hadoop       5512 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652290000.bk
> -rw-r--r--   3 mnubohadoop hadoop       5479 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652300000
> -rw-r--r--   3 mnubohadoop hadoop       5507 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652300000.bk
> -rw-r--r--   3 mnubohadoop hadoop       5476 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652310000
> -rw-r--r--   3 mnubohadoop hadoop       5504 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652310000.bk
> -rw-r--r--   3 mnubohadoop hadoop       5477 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652320000
> -rw-r--r--   3 mnubohadoop hadoop       5506 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652320000.bk
> -rw-r--r--   3 mnubohadoop hadoop       5484 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000
> -rw-r--r--   3 mnubohadoop hadoop       5504 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000.bk
> mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8
> 14/09/25 09:42:08 WARN util.NativeCodeLoader: Unable to load native-hadoop 
> library for your platform... using builtin-java classes where applicable
> Found 2 items
> drwxr-xr-x   - mnubohadoop hadoop          0 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8/rdd-8438
> drwxr-xr-x   - mnubohadoop hadoop          0 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8/rdd-8542
>
>
> (checkpoint clean-up seems to happen since the stream ran for much more
> than 5 times 10 seconds)
>
> When re-starting the stream, the startup fails with the error below,
> http://sparkhost:4040/streaming shows no statistics, no new HDFS folder
> is added in the target folder and no new checkpoint are created:
>
> 09:45:05.038 [main] ERROR c.mnubo.analytic.tabulate.StreamApp - shutting down 
> tabulation stream...
> org.apache.spark.SparkException: 
> org.apache.spark.streaming.dstream.FilteredDStream@e8949a1 has not been 
> initialized
>         at 
> org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:263) 
> ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
>         at 
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:290) 
> ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
>         at 
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>  ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
>         at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
>  ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
>         at 
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
>  ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
>         at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>  ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
>         at 
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>  ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
> ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
>         at 
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) 
> ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
>         at 
> scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) 
> ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
>         at 
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:115) 
> ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
>         at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:210)
>  ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
>         at 
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:209)
>  ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
>         at 
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>  ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
>         at 
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) 
> ~[mnubo-analytic-tabulatestreaming-assembly-0.1.jar:0.1]
>         at 
> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:209)
>  ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
>         at 
> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:80)
>  ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
>         at 
> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:66)
>  ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
>         at 
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:444) 
> ~[spark-assembly-1.1.0-hadoop2.3.0-mr1-cdh5.1.0.jar:1.1.0]
>
>
>
> mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/
> 14/09/25 09:48:39 WARN util.NativeCodeLoader: Unable to load native-hadoop 
> library for your platform... using builtin-java classes where applicable
> Found 12 items
> drwxr-xr-x   - mnubohadoop hadoop          0 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8
> drwxr-xr-x   - mnubohadoop hadoop          0 2014-09-25 09:43 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/a9dded7b-a288-44da-89d0-0309a73fab3a
> -rw-r--r--   3 mnubohadoop hadoop       5479 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652290000
> -rw-r--r--   3 mnubohadoop hadoop       5512 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652290000.bk
> -rw-r--r--   3 mnubohadoop hadoop       5479 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652300000
> -rw-r--r--   3 mnubohadoop hadoop       5507 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652300000.bk
> -rw-r--r--   3 mnubohadoop hadoop       5476 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652310000
> -rw-r--r--   3 mnubohadoop hadoop       5504 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652310000.bk
> -rw-r--r--   3 mnubohadoop hadoop       5477 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652320000
> -rw-r--r--   3 mnubohadoop hadoop       5506 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652320000.bk
> -rw-r--r--   3 mnubohadoop hadoop       5484 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000
> -rw-r--r--   3 mnubohadoop hadoop       5504 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000.bk
>
>
> Now if I delete all older checkpoints and keep only the most recent one:
>
> mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/
> 14/09/25 10:06:08 WARN util.NativeCodeLoader: Unable to load native-hadoop 
> library for your platform... using builtin-java classes where applicable
> Found 3 items
> drwxr-xr-x   - mnubohadoop hadoop          0 2014-09-25 09:43 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/a9dded7b-a288-44da-89d0-0309a73fab3a
> -rw-r--r--   3 mnubohadoop hadoop       5484 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000
> -rw-r--r--   3 mnubohadoop hadoop       5504 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-1411652330000.bk
>
>
> I end up with this (kafka?) actor non unique name error.
>
> 10:07:25.088 [Result resolver thread-0] WARN  
> o.a.spark.scheduler.TaskSetManager - Lost task 1.0 in stage 3.0 (TID 73, 
> vm21-hulk-priv.mtl.mnubo.com): akka.actor.InvalidActorNameException: actor 
> name [Receiver-0-1411654045063] is not unique!
>         
> akka.actor.dungeon.ChildrenContainer$NormalChildrenContainer.reserve(ChildrenContainer.scala:130)
>         akka.actor.dungeon.Children$class.reserveChild(Children.scala:77)
>         akka.actor.ActorCell.reserveChild(ActorCell.scala:338)
>         akka.actor.dungeon.Children$class.makeChild(Children.scala:186)
>         akka.actor.dungeon.Children$class.attachChild(Children.scala:42)
>         akka.actor.ActorCell.attachChild(ActorCell.scala:338)
>         akka.actor.ActorSystemImpl.actorOf(ActorSystem.scala:518)
>         
> org.apache.spark.streaming.receiver.ReceiverSupervisorImpl.<init>(ReceiverSupervisorImpl.scala:67)
>         
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:263)
>         
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$9.apply(ReceiverTracker.scala:257)
>         
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>         
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1121)
>         org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>         org.apache.spark.scheduler.Task.run(Task.scala:54)
>         org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
>         
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
>
> If I delete the checkpoint folder the stream starts successfully (but I
> lose my ongoing stream state, obviously)
>
> We're running spark 1.1.0 on Mesos 0.20. Our spark assembly is packaged
> with CDH 5.1.0 and Hive:
>
> sbt/sbt clean assembly/assembly -Dhadoop.version=2.3.0-mr1-cdh5.1.0 -Phive
> ./make-distribution.sh --tgz --skip-java-test 
> -Dhadoop.version=2.3.0-mr1-cdh5.1.0 -Phive
>
>
> Any comment or suggestion would be greatly appreciated.
> "
>
> On Thu, Sep 25, 2014 at 4:20 PM, Svend <svend.vanderve...@gmail.com>
> wrote:
>
>> I experience spark streaming restart issues similar to what is discussed
>> in
>> the 2 threads below (in which I failed to find a solution). Could anybody
>> let me know if anything is wrong in the way I start/stop or if this could
>> be
>> a spark bug?
>>
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-td14847.html
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/KafkaReciever-Error-when-starting-ssc-Actor-name-not-unique-tc3978.html
>>
>> My stream reads a Kafka topic, does some processing involving an
>> updatStateByKey and saves the result to HDFS.
>>
>> The context is (re)-created at startup as follows:
>>
>>
>>
>> And the start-up and shutdown of the stream is handled as follows:
>>
>>
>>
>>
>> When starting the stream for the first time (with spark-submit), the
>> processing happens successfully, folders are created on the target HDFS
>> folder and streaming stats are visible on http://sparkhost:4040/streaming
>> .
>>
>> After letting the streaming work several minutes and then stopping it
>> (ctrl-c on the command line), the following info is visible in the
>> checkpoint folder:
>>
>>
>>
>> (checkpoint clean-up seems to happen since the stream ran for much more
>> than
>> 5 times 10 seconds)
>>
>> When re-starting the stream, the startup fails with the error below,
>> http://sparkhost:4040/streaming shows no statistics, no new HDFS folder
>> is
>> added in the target folder and no new checkpoint are created:
>>
>>
>>
>>
>>
>>
>> Now if I delete all older checkpoints and keep only the most recent one:
>>
>>
>>
>> I end up with this (kafka?) actor non unique name error.
>>
>>
>>
>> If I delete the checkpoint folder the stream starts successfully (but I
>> lose
>> my ongoing stream state, obviously)
>>
>> We're running spark 1.1.0 on Mesos 0.20. Our spark assembly is packaged
>> with
>> CDH 5.1.0 and Hive:
>>
>>
>>
>> Any comment or suggestion would be greatly appreciated.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Systematic-error-when-re-starting-Spark-stream-unless-I-delete-all-checkpoints-tp15142.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>

Reply via email to