[ 
https://issues.apache.org/jira/browse/SPARK-20323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15968969#comment-15968969
 ] 

Andrei Taleanu commented on SPARK-20323:
----------------------------------------

[~srowen] I see. Let me describe you better the problem. Short version: I have 
a *streaming job*. Although a *batch processing fails the processing continues* 
if I let Spark alone handle the thrown exceptions. This translates to data loss 
and loss of at-least once semantics.

Detailed version: I started originally from an app we run on Spark 2.1.0 on top 
of Mesos w/ Hadoop 2.6, checkpointing disabled (it's done "manually" as you'll 
see below). I tried narrowing it down as much as possible to reproduce a 
similar issue in the local mode, just for illustration purposes (that's where 
the code I put in the issue description came). Consider the following use-case:
{noformat}
1) read data from a Kafka source
2) transform the dstream:
  a) get data from an external service to avoid too many calls from executors 
(might fail)
  b) broadcast the data
  c) map the RDD using the broadcast value
3) cache the transformed dstream
4) foreach RDD write cached data into a db (might fail)
5) foreach RDD:
  a) write cached data in Kafka (might fail)
  b) manually commit the new Kafka offsets (because I need a human readable 
format)
{noformat}

There are multiple points of failure here (e.g. 2.a) and what I need is failing 
asap (see 5.b which means data loss if anything prior to that one failed in a 
micro-batch processing). Obviously manipulating the context in transform is 
wrong. Obviously doing this in foreachRDD in the same thread is again wrong (as 
recommended by [~zsxwing] in SPARK-20321).

What's the recommended way to handle this? If I just let Spark alone handle 
exceptions it seems to somehow ignore them (2.a case for example) and continue 
processing. Since this means data loss I need to avoid it (I need at-least once 
guarantees).

Thanks again :)

> Calling stop in a transform stage causes the app to hang
> --------------------------------------------------------
>
>                 Key: SPARK-20323
>                 URL: https://issues.apache.org/jira/browse/SPARK-20323
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.0
>            Reporter: Andrei Taleanu
>
> I'm not sure if this is a bug or just the way it needs to happen but I've run 
> in this issue with the following code:
> {noformat}
> object ImmortalStreamingJob extends App {
>   val conf = new SparkConf().setAppName("fun-spark").setMaster("local[*]")
>   val ssc  = new StreamingContext(conf, Seconds(1))
>   val elems = (1 to 1000).grouped(10)
>     .map(seq => ssc.sparkContext.parallelize(seq))
>     .toSeq
>   val stream = ssc.queueStream(mutable.Queue[RDD[Int]](elems: _*))
>   val transformed = stream.transform { rdd =>
>     try {
>       if (Random.nextInt(6) == 5) throw new RuntimeException("boom")
>       else println("lucky bastard")
>       rdd
>     } catch {
>       case e: Throwable =>
>         println("stopping streaming context", e)
>         ssc.stop(stopSparkContext = true, stopGracefully = false)
>         throw e
>     }
>   }
>   transformed.foreachRDD { rdd =>
>     println(rdd.collect().mkString(","))
>   }
>   ssc.start()
>   ssc.awaitTermination()
> }
> {noformat}
> There are two things I can note here:
> * if the exception is thrown in the first transformation (when the first RDD 
> is processed), the spark context is stopped and the app dies
> * if the exception is thrown after at least one RDD has been processed, the 
> app hangs after printing the error message and never stops
> I think there's some sort of deadlock in the second case, is that normal? I 
> also asked this 
> [here|http://stackoverflow.com/questions/43273783/immortal-spark-streaming-job/43373624#43373624]
>  but up two this point there's no answer pointing exactly to what happens, 
> only guidelines.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to