[jira] [Commented] (SPARK-20323) Calling stop in a transform stage causes the app to hang
[ https://issues.apache.org/jira/browse/SPARK-20323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15969042#comment-15969042 ] Andrei Taleanu commented on SPARK-20323: Ok, thank you. Unfortunately I can't even get at-least once - you can see that from the example I provided in the last comment. Anyway, I'll try that. > Calling stop in a transform stage causes the app to hang > > > Key: SPARK-20323 > URL: https://issues.apache.org/jira/browse/SPARK-20323 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Andrei Taleanu > > I'm not sure if this is a bug or just the way it needs to happen but I've run > in this issue with the following code: > {noformat} > object ImmortalStreamingJob extends App { > val conf = new SparkConf().setAppName("fun-spark").setMaster("local[*]") > val ssc = new StreamingContext(conf, Seconds(1)) > val elems = (1 to 1000).grouped(10) > .map(seq => ssc.sparkContext.parallelize(seq)) > .toSeq > val stream = ssc.queueStream(mutable.Queue[RDD[Int]](elems: _*)) > val transformed = stream.transform { rdd => > try { > if (Random.nextInt(6) == 5) throw new RuntimeException("boom") > else println("lucky bastard") > rdd > } catch { > case e: Throwable => > println("stopping streaming context", e) > ssc.stop(stopSparkContext = true, stopGracefully = false) > throw e > } > } > transformed.foreachRDD { rdd => > println(rdd.collect().mkString(",")) > } > ssc.start() > ssc.awaitTermination() > } > {noformat} > There are two things I can note here: > * if the exception is thrown in the first transformation (when the first RDD > is processed), the spark context is stopped and the app dies > * if the exception is thrown after at least one RDD has been processed, the > app hangs after printing the error message and never stops > I think there's some sort of deadlock in the second case, is that normal? I > also asked this > [here|http://stackoverflow.com/questions/43273783/immortal-spark-streaming-job/43373624#43373624] > but up two this point there's no answer pointing exactly to what happens, > only guidelines. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20323) Calling stop in a transform stage causes the app to hang
[ https://issues.apache.org/jira/browse/SPARK-20323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15968969#comment-15968969 ] Andrei Taleanu commented on SPARK-20323: [~srowen] I see. Let me describe you better the problem. Short version: I have a *streaming job*. Although a *batch processing fails the processing continues* if I let Spark alone handle the thrown exceptions. This translates to data loss and loss of at-least once semantics. Detailed version: I started originally from an app we run on Spark 2.1.0 on top of Mesos w/ Hadoop 2.6, checkpointing disabled (it's done "manually" as you'll see below). I tried narrowing it down as much as possible to reproduce a similar issue in the local mode, just for illustration purposes (that's where the code I put in the issue description came). Consider the following use-case: {noformat} 1) read data from a Kafka source 2) transform the dstream: a) get data from an external service to avoid too many calls from executors (might fail) b) broadcast the data c) map the RDD using the broadcast value 3) cache the transformed dstream 4) foreach RDD write cached data into a db (might fail) 5) foreach RDD: a) write cached data in Kafka (might fail) b) manually commit the new Kafka offsets (because I need a human readable format) {noformat} There are multiple points of failure here (e.g. 2.a) and what I need is failing asap (see 5.b which means data loss if anything prior to that one failed in a micro-batch processing). Obviously manipulating the context in transform is wrong. Obviously doing this in foreachRDD in the same thread is again wrong (as recommended by [~zsxwing] in SPARK-20321). What's the recommended way to handle this? If I just let Spark alone handle exceptions it seems to somehow ignore them (2.a case for example) and continue processing. Since this means data loss I need to avoid it (I need at-least once guarantees). Thanks again :) > Calling stop in a transform stage causes the app to hang > > > Key: SPARK-20323 > URL: https://issues.apache.org/jira/browse/SPARK-20323 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Andrei Taleanu > > I'm not sure if this is a bug or just the way it needs to happen but I've run > in this issue with the following code: > {noformat} > object ImmortalStreamingJob extends App { > val conf = new SparkConf().setAppName("fun-spark").setMaster("local[*]") > val ssc = new StreamingContext(conf, Seconds(1)) > val elems = (1 to 1000).grouped(10) > .map(seq => ssc.sparkContext.parallelize(seq)) > .toSeq > val stream = ssc.queueStream(mutable.Queue[RDD[Int]](elems: _*)) > val transformed = stream.transform { rdd => > try { > if (Random.nextInt(6) == 5) throw new RuntimeException("boom") > else println("lucky bastard") > rdd > } catch { > case e: Throwable => > println("stopping streaming context", e) > ssc.stop(stopSparkContext = true, stopGracefully = false) > throw e > } > } > transformed.foreachRDD { rdd => > println(rdd.collect().mkString(",")) > } > ssc.start() > ssc.awaitTermination() > } > {noformat} > There are two things I can note here: > * if the exception is thrown in the first transformation (when the first RDD > is processed), the spark context is stopped and the app dies > * if the exception is thrown after at least one RDD has been processed, the > app hangs after printing the error message and never stops > I think there's some sort of deadlock in the second case, is that normal? I > also asked this > [here|http://stackoverflow.com/questions/43273783/immortal-spark-streaming-job/43373624#43373624] > but up two this point there's no answer pointing exactly to what happens, > only guidelines. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20323) Calling stop in a transform stage causes the app to hang
[ https://issues.apache.org/jira/browse/SPARK-20323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15968940#comment-15968940 ] Andrei Taleanu commented on SPARK-20323: [~srowen] could you please give me a link that documents this bad-practice? I understood and expected that this was an incorrect approach since transformation are lazy. However, I asked the question because of a common problem I would say: you have a streaming app and when something goes wrong either on the driver / executors you want to fail fast in order to avoid some data loss / corruption. Stopping the context seems the most straightforward way. But doing so appears not to be that easy. So how should one handle this case? Thanks. > Calling stop in a transform stage causes the app to hang > > > Key: SPARK-20323 > URL: https://issues.apache.org/jira/browse/SPARK-20323 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Andrei Taleanu > > I'm not sure if this is a bug or just the way it needs to happen but I've run > in this issue with the following code: > {noformat} > object ImmortalStreamingJob extends App { > val conf = new SparkConf().setAppName("fun-spark").setMaster("local[*]") > val ssc = new StreamingContext(conf, Seconds(1)) > val elems = (1 to 1000).grouped(10) > .map(seq => ssc.sparkContext.parallelize(seq)) > .toSeq > val stream = ssc.queueStream(mutable.Queue[RDD[Int]](elems: _*)) > val transformed = stream.transform { rdd => > try { > if (Random.nextInt(6) == 5) throw new RuntimeException("boom") > else println("lucky bastard") > rdd > } catch { > case e: Throwable => > println("stopping streaming context", e) > ssc.stop(stopSparkContext = true, stopGracefully = false) > throw e > } > } > transformed.foreachRDD { rdd => > println(rdd.collect().mkString(",")) > } > ssc.start() > ssc.awaitTermination() > } > {noformat} > There are two things I can note here: > * if the exception is thrown in the first transformation (when the first RDD > is processed), the spark context is stopped and the app dies > * if the exception is thrown after at least one RDD has been processed, the > app hangs after printing the error message and never stops > I think there's some sort of deadlock in the second case, is that normal? I > also asked this > [here|http://stackoverflow.com/questions/43273783/immortal-spark-streaming-job/43373624#43373624] > but up two this point there's no answer pointing exactly to what happens, > only guidelines. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20323) Calling stop in a transform stage causes the app to hang
[ https://issues.apache.org/jira/browse/SPARK-20323?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967516#comment-15967516 ] Andrei Taleanu commented on SPARK-20323: All right, so the behaviour is undefined in that case? > Calling stop in a transform stage causes the app to hang > > > Key: SPARK-20323 > URL: https://issues.apache.org/jira/browse/SPARK-20323 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Andrei Taleanu > > I'm not sure if this is a bug or just the way it needs to happen but I've run > in this issue with the following code: > {noformat} > object ImmortalStreamingJob extends App { > val conf = new SparkConf().setAppName("fun-spark").setMaster("local[*]") > val ssc = new StreamingContext(conf, Seconds(1)) > val elems = (1 to 1000).grouped(10) > .map(seq => ssc.sparkContext.parallelize(seq)) > .toSeq > val stream = ssc.queueStream(mutable.Queue[RDD[Int]](elems: _*)) > val transformed = stream.transform { rdd => > try { > if (Random.nextInt(6) == 5) throw new RuntimeException("boom") > else println("lucky bastard") > rdd > } catch { > case e: Throwable => > println("stopping streaming context", e) > ssc.stop(stopSparkContext = true, stopGracefully = false) > throw e > } > } > transformed.foreachRDD { rdd => > println(rdd.collect().mkString(",")) > } > ssc.start() > ssc.awaitTermination() > } > {noformat} > There are two things I can note here: > * if the exception is thrown in the first transformation (when the first RDD > is processed), the spark context is stopped and the app dies > * if the exception is thrown after at least one RDD has been processed, the > app hangs after printing the error message and never stops > I think there's some sort of deadlock in the second case, is that normal? I > also asked this > [here|http://stackoverflow.com/questions/43273783/immortal-spark-streaming-job/43373624#43373624] > but up two this point there's no answer pointing exactly to what happens, > only guidelines. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-20321) Spark UI cannot be shutdown in spark streaming app
[ https://issues.apache.org/jira/browse/SPARK-20321?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15967514#comment-15967514 ] Andrei Taleanu commented on SPARK-20321: Not sure what it would help. I could provide more logs, but I'm not sure how that helps as they're typical stopping logs. Also, the SparkUI remains accessible when this happens and you can see a single job that runs forever. I've tried recreating a minimal example but somehow I can't reproduce it in another minimal streaming app. In the app where I've encountered this issue it happens every time so it's really easy to reproduce. I'm not sure what I should be looking at so I can provide more details. > Spark UI cannot be shutdown in spark streaming app > -- > > Key: SPARK-20321 > URL: https://issues.apache.org/jira/browse/SPARK-20321 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Andrei Taleanu > > When an exception thrown the transform stage is handled in foreachRDD and the > streaming context is forced to stop, the SparkUI appears to hang and > continually dump the following logs in an infinite loop: > {noformat} > ... > 2017-04-12 14:11:13,470 > [SparkUI-50-selector-ServerConnectorManager@512d4583/1] DEBUG > org.spark_project.jetty.io.SelectorManager - Selector loop woken up from > select, 0/0 selected > 2017-04-12 14:11:13,470 > [SparkUI-50-selector-ServerConnectorManager@512d4583/1] DEBUG > org.spark_project.jetty.io.SelectorManager - Selector loop waiting on select > 2017-04-12 14:11:13,470 > [SparkUI-50-selector-ServerConnectorManager@512d4583/1] DEBUG > org.spark_project.jetty.io.SelectorManager - Selector loop woken up from > select, 0/0 selected > 2017-04-12 14:11:13,470 > [SparkUI-50-selector-ServerConnectorManager@512d4583/1] DEBUG > org.spark_project.jetty.io.SelectorManager - Selector loop waiting on select > ... > {noformat} > Unfortunately I don't have a minimal example that reproduces this issue but > here is what I can share: > {noformat} > val dstream = pull data from kafka > val mapped = dstream.transform { rdd => > val data = getData // Perform a call that potentially throws an exception > // broadcast the data > // flatMap the RDD using the data > } > mapped.foreachRDD { > try { > // write some data in a DB > } catch { > case t: Throwable => > dstream.context.stop(stopSparkContext = true, stopGracefully = false) > } > } > mapped.foreachRDD { > try { > // write data to Kafka > // manually checkpoint the Kafka offsets (because I need them in JSON > format) > } catch { > case t: Throwable => > dstream.context.stop(stopSparkContext = true, stopGracefully = false) > } > } > {noformat} > The issue appears when stop is invoked. At the point when SparkUI is stopped, > it enters that infinite loop. Initially I thought it relates to Jetty, as the > version used in SparkUI had some bugs (e.g. [this > one|https://bugs.eclipse.org/bugs/show_bug.cgi?id=452465]). I bumped Jetty to > a more recent version (March 2017) and built Spark 2.1.0 with that one but > still got the error. > I encountered this issue with Spark 2.1.0 built with Hadoop 2.6 on top of > Mesos. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-20321) Spark UI cannot be shutdown in spark streaming app
[ https://issues.apache.org/jira/browse/SPARK-20321?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Andrei Taleanu updated SPARK-20321: --- Description: When an exception thrown the transform stage is handled in foreachRDD and the streaming context is forced to stop, the SparkUI appears to hang and continually dump the following logs in an infinite loop: {noformat} ... 2017-04-12 14:11:13,470 [SparkUI-50-selector-ServerConnectorManager@512d4583/1] DEBUG org.spark_project.jetty.io.SelectorManager - Selector loop woken up from select, 0/0 selected 2017-04-12 14:11:13,470 [SparkUI-50-selector-ServerConnectorManager@512d4583/1] DEBUG org.spark_project.jetty.io.SelectorManager - Selector loop waiting on select 2017-04-12 14:11:13,470 [SparkUI-50-selector-ServerConnectorManager@512d4583/1] DEBUG org.spark_project.jetty.io.SelectorManager - Selector loop woken up from select, 0/0 selected 2017-04-12 14:11:13,470 [SparkUI-50-selector-ServerConnectorManager@512d4583/1] DEBUG org.spark_project.jetty.io.SelectorManager - Selector loop waiting on select ... {noformat} Unfortunately I don't have a minimal example that reproduces this issue but here is what I can share: {noformat} val dstream = pull data from kafka val mapped = dstream.transform { rdd => val data = getData // Perform a call that potentially throws an exception // broadcast the data // flatMap the RDD using the data } mapped.foreachRDD { try { // write some data in a DB } catch { case t: Throwable => dstream.context.stop(stopSparkContext = true, stopGracefully = false) } } mapped.foreachRDD { try { // write data to Kafka // manually checkpoint the Kafka offsets (because I need them in JSON format) } catch { case t: Throwable => dstream.context.stop(stopSparkContext = true, stopGracefully = false) } } {noformat} The issue appears when stop is invoked. At the point when SparkUI is stopped, it enters that infinite loop. Initially I thought it relates to Jetty, as the version used in SparkUI had some bugs (e.g. [this one|https://bugs.eclipse.org/bugs/show_bug.cgi?id=452465]). I bumped Jetty to a more recent version (March 2017) and built Spark 2.1.0 with that one but still got the error. I encountered this issue with Spark 2.1.0 built with Hadoop 2.6 on top of Mesos. was: When an exception thrown the transform stage is handled in foreachRDD and the streaming context is forced to stop, the SparkUI appears to hang and continually dump the following logs in an infinite loop: {noformat} ... 2017-04-12 14:11:13,470 [SparkUI-50-selector-ServerConnectorManager@512d4583/1] DEBUG org.spark_project.jetty.io.SelectorManager - Selector loop woken up from select, 0/0 selected 2017-04-12 14:11:13,470 [SparkUI-50-selector-ServerConnectorManager@512d4583/1] DEBUG org.spark_project.jetty.io.SelectorManager - Selector loop waiting on select 2017-04-12 14:11:13,470 [SparkUI-50-selector-ServerConnectorManager@512d4583/1] DEBUG org.spark_project.jetty.io.SelectorManager - Selector loop woken up from select, 0/0 selected 2017-04-12 14:11:13,470 [SparkUI-50-selector-ServerConnectorManager@512d4583/1] DEBUG org.spark_project.jetty.io.SelectorManager - Selector loop waiting on select ... {noformat} Unfortunately I don't have a minimal example that reproduces this issue but here is what I can share: {noformat} val dstream = pull data from kafka val mapped = dstream transform { rdd => val data = getData // Perform a call that potentially throws an exception // broadcast the data // flatMap the RDD using the data } mapped.foreachRDD { try { // write some data in a DB } catch { case t: Throwable => dstream.context.stop(stopSparkContext = true, stopGracefully = false) } } mapped.foreachRDD { try { // write data to Kafka // manually checkpoint the Kafka offsets (because I need them in JSON format) } catch { case t: Throwable => dstream.context.stop(stopSparkContext = true, stopGracefully = false) } } {noformat} The issue appears when stop is invoked. At the point when SparkUI is stopped, it enters that infinite loop. Initially I thought it relates to Jetty, as the version used in SparkUI had some bugs (e.g. [this one|https://bugs.eclipse.org/bugs/show_bug.cgi?id=452465]). I bumped Jetty to a more recent version (March 2017) but still got the error. I encountered this issue with Spark 2.1.0 built with Hadoop 2.6 on top of Mesos. > Spark UI cannot be shutdown in spark streaming app > -- > > Key: SPARK-20321 > URL: https://issues.apache.org/jira/browse/SPARK-20321 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 2.1.0 >Reporter: Andrei Taleanu > > When an exception thrown the transform stage is handled in foreachRDD and the >
[jira] [Created] (SPARK-20323) Calling stop in a transform stage causes the app to hang
Andrei Taleanu created SPARK-20323: -- Summary: Calling stop in a transform stage causes the app to hang Key: SPARK-20323 URL: https://issues.apache.org/jira/browse/SPARK-20323 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.1.0 Reporter: Andrei Taleanu I'm not sure if this is a bug or just the way it needs to happen but I've run in this issue with the following code: {noformat} object ImmortalStreamingJob extends App { val conf = new SparkConf().setAppName("fun-spark").setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(1)) val elems = (1 to 1000).grouped(10) .map(seq => ssc.sparkContext.parallelize(seq)) .toSeq val stream = ssc.queueStream(mutable.Queue[RDD[Int]](elems: _*)) val transformed = stream.transform { rdd => try { if (Random.nextInt(6) == 5) throw new RuntimeException("boom") else println("lucky bastard") rdd } catch { case e: Throwable => println("stopping streaming context", e) ssc.stop(stopSparkContext = true, stopGracefully = false) throw e } } transformed.foreachRDD { rdd => println(rdd.collect().mkString(",")) } ssc.start() ssc.awaitTermination() } {noformat} There are two things I can note here: * if the exception is thrown in the first transformation (when the first RDD is processed), the spark context is stopped and the app dies * if the exception is thrown after at least one RDD has been processed, the app hangs after printing the error message and never stops I think there's some sort of deadlock in the second case, is that normal? I also asked this [here|http://stackoverflow.com/questions/43273783/immortal-spark-streaming-job/43373624#43373624] but up two this point there's no answer pointing exactly to what happens, only guidelines. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-20321) Spark UI cannot be shutdown in spark streaming app
Andrei Taleanu created SPARK-20321: -- Summary: Spark UI cannot be shutdown in spark streaming app Key: SPARK-20321 URL: https://issues.apache.org/jira/browse/SPARK-20321 Project: Spark Issue Type: Bug Components: Spark Core Affects Versions: 2.1.0 Reporter: Andrei Taleanu When an exception thrown the transform stage is handled in foreachRDD and the streaming context is forced to stop, the SparkUI appears to hang and continually dump the following logs in an infinite loop: {noformat} ... 2017-04-12 14:11:13,470 [SparkUI-50-selector-ServerConnectorManager@512d4583/1] DEBUG org.spark_project.jetty.io.SelectorManager - Selector loop woken up from select, 0/0 selected 2017-04-12 14:11:13,470 [SparkUI-50-selector-ServerConnectorManager@512d4583/1] DEBUG org.spark_project.jetty.io.SelectorManager - Selector loop waiting on select 2017-04-12 14:11:13,470 [SparkUI-50-selector-ServerConnectorManager@512d4583/1] DEBUG org.spark_project.jetty.io.SelectorManager - Selector loop woken up from select, 0/0 selected 2017-04-12 14:11:13,470 [SparkUI-50-selector-ServerConnectorManager@512d4583/1] DEBUG org.spark_project.jetty.io.SelectorManager - Selector loop waiting on select ... {noformat} Unfortunately I don't have a minimal example that reproduces this issue but here is what I can share: {noformat} val dstream = pull data from kafka val mapped = dstream transform { rdd => val data = getData // Perform a call that potentially throws an exception // broadcast the data // flatMap the RDD using the data } mapped.foreachRDD { try { // write some data in a DB } catch { case t: Throwable => dstream.context.stop(stopSparkContext = true, stopGracefully = false) } } mapped.foreachRDD { try { // write data to Kafka // manually checkpoint the Kafka offsets (because I need them in JSON format) } catch { case t: Throwable => dstream.context.stop(stopSparkContext = true, stopGracefully = false) } } {noformat} The issue appears when stop is invoked. At the point when SparkUI is stopped, it enters that infinite loop. Initially I thought it relates to Jetty, as the version used in SparkUI had some bugs (e.g. [this one|https://bugs.eclipse.org/bugs/show_bug.cgi?id=452465]). I bumped Jetty to a more recent version (March 2017) but still got the error. I encountered this issue with Spark 2.1.0 built with Hadoop 2.6 on top of Mesos. -- This message was sent by Atlassian JIRA (v6.3.15#6346) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org