[ https://issues.apache.org/jira/browse/SPARK-17397?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15487691#comment-15487691 ]
Spiro Michaylov commented on SPARK-17397: ----------------------------------------- In the form of a PR? Sure, I can do that. If I don't get to it in the next couple of days it'll take a few weeks, but I'll eventually get to it. I'll assign it to myself but if someone wants to yank it back that's fine. > Show example of what to do when awaitTermination() throws an Exception > ---------------------------------------------------------------------- > > Key: SPARK-17397 > URL: https://issues.apache.org/jira/browse/SPARK-17397 > Project: Spark > Issue Type: Improvement > Components: Documentation, Streaming > Affects Versions: 2.0.0 > Environment: Linux, Scala but probably general > Reporter: Spiro Michaylov > Priority: Minor > > When awaitTermination propagates an exception that was thrown in processing a > batch, the StreamingContext keeps running. Perhaps this is by design, but I > don't see any mention of it in the API docs or the streaming programming > guide. It's not clear what idiom should be used to block the thread until the > context HAS been stopped in a situation where stream processing is throwing > lots of exceptions. > For example, in the following, streaming takes the full 30 seconds to > terminate. My hope in asking this is to improve my own understanding and > perhaps inspire documentation improvements. I'm not filing a bug because it's > not clear to me whether this is working as intended. > {code} > val conf = new > SparkConf().setAppName("ExceptionPropagation").setMaster("local[4]") > val sc = new SparkContext(conf) > // streams will produce data every second > val ssc = new StreamingContext(sc, Seconds(1)) > val qm = new QueueMaker(sc, ssc) > // create the stream > val stream = // create some stream > // register for data > stream > .map(x => { throw new SomeException("something"); x} ) > .foreachRDD(r => println("*** count = " + r.count())) > // start streaming > ssc.start() > new Thread("Delayed Termination") { > override def run() { > Thread.sleep(30000) > ssc.stop() > } > }.start() > println("*** producing data") > // start producing data > qm.populateQueue() > try { > ssc.awaitTermination() > println("*** streaming terminated") > } catch { > case e: Exception => { > println("*** streaming exception caught in monitor thread") > } > } > // if the above goes down the exception path, there seems no > // good way to block here until the streaming context is stopped > println("*** done") > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org