[ 
https://issues.apache.org/jira/browse/SPARK-17397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15487691#comment-15487691
 ] 

Spiro Michaylov commented on SPARK-17397:
-----------------------------------------

In the form of a PR? Sure, I can do that. If I don't get to it in the next 
couple of days it'll take a few weeks, but I'll eventually get to it. I'll 
assign it to myself but if someone wants to yank it back that's fine. 

> Show example of what to do when awaitTermination() throws an Exception
> ----------------------------------------------------------------------
>
>                 Key: SPARK-17397
>                 URL: https://issues.apache.org/jira/browse/SPARK-17397
>             Project: Spark
>          Issue Type: Improvement
>          Components: Documentation, Streaming
>    Affects Versions: 2.0.0
>         Environment: Linux, Scala but probably general
>            Reporter: Spiro Michaylov
>            Priority: Minor
>
> When awaitTermination propagates an exception that was thrown in processing a 
> batch, the StreamingContext keeps running. Perhaps this is by design, but I 
> don't see any mention of it in the API docs or the streaming programming 
> guide. It's not clear what idiom should be used to block the thread until the 
> context HAS been stopped in a situation where stream processing is throwing 
> lots of exceptions. 
> For example, in the following, streaming takes the full 30 seconds to 
> terminate. My hope in asking this is to improve my own understanding and 
> perhaps inspire documentation improvements. I'm not filing a bug because it's 
> not clear to me whether this is working as intended. 
> {code}
>     val conf = new 
> SparkConf().setAppName("ExceptionPropagation").setMaster("local[4]")
>     val sc = new SparkContext(conf)
>     // streams will produce data every second
>     val ssc = new StreamingContext(sc, Seconds(1))
>     val qm = new QueueMaker(sc, ssc)
>     // create the stream
>     val stream = // create some stream
>     // register for data
>     stream
>       .map(x => { throw new SomeException("something"); x} )
>       .foreachRDD(r => println("*** count = " + r.count()))
>     // start streaming
>     ssc.start()
>     new Thread("Delayed Termination") {
>       override def run() {
>         Thread.sleep(30000)
>         ssc.stop()
>       }
>     }.start()
>     println("*** producing data")
>     // start producing data
>     qm.populateQueue()
>     try {
>       ssc.awaitTermination()
>       println("*** streaming terminated")
>     } catch {
>       case e: Exception => {
>         println("*** streaming exception caught in monitor thread")
>       }
>     }
>     // if the above goes down the exception path, there seems no 
>     // good way to block here until the streaming context is stopped 
>     println("*** done")
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to