Re: Systematic error when re-starting Spark stream unless I delete all checkpoints

2014-09-30 Thread Svend Vanderveken
Hi again,


Just FYI, I found the mistake in my code regarding restartability of spark
streaming: I had a method providing a context (either retrieved from
checkpoint or, if no checkpoint available, built anew) and was building
then starting a stream on it.

The mistake is that we should not build a stream out of a context retrieved
from checkpoint since it already contains all necessary elements, as also
illustrated in the RecoverableNetworkWordCount
(*https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala
<https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/RecoverableNetworkWordCount.scala>)*

S










On Fri, Sep 26, 2014 at 3:09 PM, Svend Vanderveken <
svend.vanderve...@gmail.com> wrote:

> Hi all,
>
> I apologise for re-posting this, I realise some mail systems are filtering
> all the code samples from the original post.
>
> I would greatly appreciate any pointer regarding, this issue basically
> renders spark streaming not fault-tolerant for us.
>
> Thanks in advance,
>
> S
>
>
>
> ---
>
> "
> I experience spark streaming restart issues similar to what is discussed
> in the 2 threads below (in which I failed to find a solution). Could
> anybody let me know if anything is wrong in the way I start/stop or if this
> could be a spark bug?
>
>
> http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-td14847.html
>
> http://apache-spark-user-list.1001560.n3.nabble.com/KafkaReciever-Error-when-starting-ssc-Actor-name-not-unique-tc3978.html
>
> My stream reads a Kafka topic, does some processing involving an
> updatStateByKey and saves the result to HDFS.
>
> The context is (re)-created at startup as follows:
>
> def streamContext() = {
>
> def newContext() = {
>   val ctx = new StreamingContext(sparkConf, Duration(1))
>   ctx.checkpoint("hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/")
>   ctx
> }
>
> 
> StreamingContext.getOrCreate("hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/",
>  newContext)
>   }
>
>
> And the start-up and shutdown of the stream is handled as follows:
>
> try {
>
> val sparkContext = streamContext()
>
> [.. build stream here...]
>
> sparkContext.start()
> sparkContext.awaitTermination()
>
>   } catch {
>   case e: Throwable =>
> log.error("shutting down tabulation stream...", e)
> sparkContext.stop()
> log.info("...waiting termination...")
> sparkContext.awaitTermination()
> log.info("...tabulation stream stopped")
>   }
>
>
>
> When starting the stream for the first time (with spark-submit), the
> processing happens successfully, folders are created on the target HDFS
> folder and streaming stats are visible on http://sparkhost:4040/streaming
> .
>
> After letting the streaming work several minutes and then stopping it
> (ctrl-c on the command line), the following info is visible in the
> checkpoint folder:
>
> mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/
> 14/09/25 09:39:13 WARN util.NativeCodeLoader: Unable to load native-hadoop 
> library for your platform... using builtin-java classes where applicable
> Found 11 items
> drwxr-xr-x   - mnubohadoop hadoop  0 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8
> -rw-r--r--   3 mnubohadoop hadoop   5479 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165229
> -rw-r--r--   3 mnubohadoop hadoop   5512 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165229.bk
> -rw-r--r--   3 mnubohadoop hadoop   5479 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165230
> -rw-r--r--   3 mnubohadoop hadoop   5507 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165230.bk
> -rw-r--r--   3 mnubohadoop hadoop   5476 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165231
> -rw-r--r--   3 mnubohadoop hadoop   5504 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165231.bk
> -rw-r--r--   3 mnubohadoop hadoop   5477 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165232
> -rw-r--r--   3 mnubohadoop hadoop   5506 2014-09-25 09:38 
> hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165232.bk
> -rw-

Re: Systematic error when re-starting Spark stream unless I delete all checkpoints

2014-09-26 Thread Svend Vanderveken
Hi all,

I apologise for re-posting this, I realise some mail systems are filtering
all the code samples from the original post.

I would greatly appreciate any pointer regarding, this issue basically
renders spark streaming not fault-tolerant for us.

Thanks in advance,

S



---

"
I experience spark streaming restart issues similar to what is discussed in
the 2 threads below (in which I failed to find a solution). Could anybody
let me know if anything is wrong in the way I start/stop or if this could
be a spark bug?

http://apache-spark-user-list.1001560.n3.nabble.com/RDD-data-checkpoint-cleaning-td14847.html
http://apache-spark-user-list.1001560.n3.nabble.com/KafkaReciever-Error-when-starting-ssc-Actor-name-not-unique-tc3978.html

My stream reads a Kafka topic, does some processing involving an
updatStateByKey and saves the result to HDFS.

The context is (re)-created at startup as follows:

def streamContext() = {

def newContext() = {
  val ctx = new StreamingContext(sparkConf, Duration(1))
  ctx.checkpoint("hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/")
  ctx
}


StreamingContext.getOrCreate("hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/",
newContext)
  }


And the start-up and shutdown of the stream is handled as follows:

try {

val sparkContext = streamContext()

[.. build stream here...]

sparkContext.start()
sparkContext.awaitTermination()

  } catch {
  case e: Throwable =>
log.error("shutting down tabulation stream...", e)
sparkContext.stop()
log.info("...waiting termination...")
sparkContext.awaitTermination()
log.info("...tabulation stream stopped")
  }



When starting the stream for the first time (with spark-submit), the
processing happens successfully, folders are created on the target HDFS
folder and streaming stats are visible on http://sparkhost:4040/streaming.

After letting the streaming work several minutes and then stopping it
(ctrl-c on the command line), the following info is visible in the
checkpoint folder:

mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/
14/09/25 09:39:13 WARN util.NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java classes
where applicable
Found 11 items
drwxr-xr-x   - mnubohadoop hadoop  0 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8
-rw-r--r--   3 mnubohadoop hadoop   5479 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165229
-rw-r--r--   3 mnubohadoop hadoop   5512 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165229.bk
-rw-r--r--   3 mnubohadoop hadoop   5479 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165230
-rw-r--r--   3 mnubohadoop hadoop   5507 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165230.bk
-rw-r--r--   3 mnubohadoop hadoop   5476 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165231
-rw-r--r--   3 mnubohadoop hadoop   5504 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165231.bk
-rw-r--r--   3 mnubohadoop hadoop   5477 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165232
-rw-r--r--   3 mnubohadoop hadoop   5506 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165232.bk
-rw-r--r--   3 mnubohadoop hadoop   5484 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165233
-rw-r--r--   3 mnubohadoop hadoop   5504 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/checkpoint-141165233.bk
mnubohadoop@vm28-hulk-priv:~/streamingtabulate$ hdfs dfs -ls
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8
14/09/25 09:42:08 WARN util.NativeCodeLoader: Unable to load
native-hadoop library for your platform... using builtin-java classes
where applicable
Found 2 items
drwxr-xr-x   - mnubohadoop hadoop  0 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8/rdd-8438
drwxr-xr-x   - mnubohadoop hadoop  0 2014-09-25 09:38
hdfs://vm28-hulk-priv:8020/tmp/tabulationstream/0d6a18a7-6d00-407a-bfd7-e40829f6d2a8/rdd-8542


(checkpoint clean-up seems to happen since the stream ran for much more
than 5 times 10 seconds)

When re-starting the stream, the startup fails with the error below,
http://sparkhost:4040/streaming shows no statistics, no new HDFS folder is
added in the target folder and no new checkpoint are created:

09:45:05.038 [main] ERROR c.mnubo.analytic.tabulate.StreamApp -
shutting down tabulation stream...
org.apache.spark.SparkException:
org.apache.spark.streaming.dstream.FilteredDStream@e8949a1 has not
been initialized