Re: Systematic error when re-starting Spark stream unless I delete all checkpoints
Hi again, Just FYI, I found the mistake in my code regarding restartability of spark streaming: I had a method providing a context (either retrieved from checkpoint or, if no checkpoint available, built anew) and was building then starting a stream on it. The mistake is that we should not build a stream out of a context retrieved from checkpoint since it already contains all necessary elements, as also illustrated in the RecoverableNetworkWordCount (*https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala <https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala>)* S On Fri, Sep 26, 2014 at 3:09 PM, Svend Vanderveken < svend.vanderve...@gmail.com> wrote: > Hi all, > > I apologise for re-posting this, I realise some mail systems are filtering > all the code samples from the original post. > > I would greatly appreciate any pointer regarding, this issue basically > renders spark streaming not fault-tolerant for us. > > Thanks in advance, > > S > > > > --- > > " > I experience spark streaming restart issues similar to what is discussed > in the 2 threads below (in which I failed to find a solution). Could > anybody let me know if anything is wrong in the way I start/stop or if this > could be a spark bug? > > > http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-td14847.html > > http://apache-spark-user-list.1001560.n3.nabble.com/KafkaReciever-Error-when-starting-ssc-Actor-name-not-unique-tc3978.html > > My stream reads a Kafka topic, does some processing involving an > updatStateByKey and saves the result to HDFS. > > The context is (re)-created at startup as follows: > > def streamContext() = { > > def newContext() = { > val ctx = new StreamingContext(sparkConf, Duration(1)) > ctx.checkpoint("hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/") > ctx > } > > > StreamingContext.getOrCreate("hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/", > newContext) > } > > > And the start-up and shutdown of the stream is handled as follows: > > try { > > val sparkContext = streamContext() > > [.. build stream here...] > > sparkContext.start() > sparkContext.awaitTermination() > > } catch { > case e: Throwable => > log.error("shutting down tabulation stream...", e) > sparkContext.stop() > log.info("...waiting termination...") > sparkContext.awaitTermination() > log.info("...tabulation stream stopped") > } > > > > When starting the stream for the first time (with spark-submit), the > processing happens successfully, folders are created on the target HDFS > folder and streaming stats are visible on http://sparkhost:4040/streaming > . > > After letting the streaming work several minutes and then stopping it > (ctrl-c on the command line), the following info is visible in the > checkpoint folder: > > mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/ > 14/09/25 09:39:13 WARN util.NativeCodeLoader: Unable to load native-hadoop > library for your platform... using builtin-java classes where applicable > Found 11 items > drwxr-xr-x - mnubohadoop hadoop 0 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8 > -rw-r--r-- 3 mnubohadoop hadoop 5479 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165229 > -rw-r--r-- 3 mnubohadoop hadoop 5512 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165229.bk > -rw-r--r-- 3 mnubohadoop hadoop 5479 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165230 > -rw-r--r-- 3 mnubohadoop hadoop 5507 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165230.bk > -rw-r--r-- 3 mnubohadoop hadoop 5476 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165231 > -rw-r--r-- 3 mnubohadoop hadoop 5504 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165231.bk > -rw-r--r-- 3 mnubohadoop hadoop 5477 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165232 > -rw-r--r-- 3 mnubohadoop hadoop 5506 2014-09-25 09:38 > hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165232.bk > -rw-
Re: Systematic error when re-starting Spark stream unless I delete all checkpoints
Hi all, I apologise for re-posting this, I realise some mail systems are filtering all the code samples from the original post. I would greatly appreciate any pointer regarding, this issue basically renders spark streaming not fault-tolerant for us. Thanks in advance, S --- " I experience spark streaming restart issues similar to what is discussed in the 2 threads below (in which I failed to find a solution). Could anybody let me know if anything is wrong in the way I start/stop or if this could be a spark bug? http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-td14847.html http://apache-spark-user-list.1001560.n3.nabble.com/KafkaReciever-Error-when-starting-ssc-Actor-name-not-unique-tc3978.html My stream reads a Kafka topic, does some processing involving an updatStateByKey and saves the result to HDFS. The context is (re)-created at startup as follows: def streamContext() = { def newContext() = { val ctx = new StreamingContext(sparkConf, Duration(1)) ctx.checkpoint("hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/") ctx } StreamingContext.getOrCreate("hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/", newContext) } And the start-up and shutdown of the stream is handled as follows: try { val sparkContext = streamContext() [.. build stream here...] sparkContext.start() sparkContext.awaitTermination() } catch { case e: Throwable => log.error("shutting down tabulation stream...", e) sparkContext.stop() log.info("...waiting termination...") sparkContext.awaitTermination() log.info("...tabulation stream stopped") } When starting the stream for the first time (with spark-submit), the processing happens successfully, folders are created on the target HDFS folder and streaming stats are visible on http://sparkhost:4040/streaming. After letting the streaming work several minutes and then stopping it (ctrl-c on the command line), the following info is visible in the checkpoint folder: mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/ 14/09/25 09:39:13 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 11 items drwxr-xr-x - mnubohadoop hadoop 0 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8 -rw-r--r-- 3 mnubohadoop hadoop 5479 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165229 -rw-r--r-- 3 mnubohadoop hadoop 5512 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165229.bk -rw-r--r-- 3 mnubohadoop hadoop 5479 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165230 -rw-r--r-- 3 mnubohadoop hadoop 5507 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165230.bk -rw-r--r-- 3 mnubohadoop hadoop 5476 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165231 -rw-r--r-- 3 mnubohadoop hadoop 5504 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165231.bk -rw-r--r-- 3 mnubohadoop hadoop 5477 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165232 -rw-r--r-- 3 mnubohadoop hadoop 5506 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165232.bk -rw-r--r-- 3 mnubohadoop hadoop 5484 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165233 -rw-r--r-- 3 mnubohadoop hadoop 5504 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165233.bk mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8 14/09/25 09:42:08 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 2 items drwxr-xr-x - mnubohadoop hadoop 0 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8/rdd-8438 drwxr-xr-x - mnubohadoop hadoop 0 2014-09-25 09:38 hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8/rdd-8542 (checkpoint clean-up seems to happen since the stream ran for much more than 5 times 10 seconds) When re-starting the stream, the startup fails with the error below, http://sparkhost:4040/streaming shows no statistics, no new HDFS folder is added in the target folder and no new checkpoint are created: 09:45:05.038 [main] ERROR c.mnubo.analytic.tabulate.StreamApp - shutting down tabulation stream... org.apache.spark.SparkException: org.apache.spark.streaming.dstream.FilteredDStream@e8949a1 has not been initialized