[ https://issues.apache.org/jira/browse/SPARK-19264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15827181#comment-15827181 ]
hustfxj edited comment on SPARK-19264 at 1/18/17 1:16 AM: ---------------------------------------------------------- [~srowen] sorry, I didn't say it clearly. I means the spark application can't be done when it contains other unfinished non-daemon. Look at the followed example. the driver program should crash due to the exception. But In fact the driver program can't crash because the timer threads still are runnning. {code:title=Test.scala|borderStyle=solid} val sparkConf = new SparkConf().setAppName("NetworkWordCount") sparkConf.set("spark.streaming.blockInterval", "1000ms") val ssc = new StreamingContext(sparkConf, Seconds(10)) //non-daemon thread val scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( new ThreadFactory() { def newThread(r: Runnable): Thread = new Thread(r, "Driver-Commit-Thread") }) scheduledExecutorService.scheduleAtFixedRate( new Runnable() { def run() { try { System.out.println("runable") } catch { case e: Exception => { System.out.println("ScheduledTask persistAllConsumerOffset exception", e) } } } }, 1000, 1000 * 5, TimeUnit.MILLISECONDS) val lines = ssc.receiverStream(new WordReceiver(StorageLevel.MEMORY_AND_DISK_2)) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey((x: Int, y: Int) => x + y, 10) wordCounts.foreachRDD{rdd => rdd.collect().foreach(println) throw new RuntimeException } ssc.start() ssc.awaitTermination() {code} was (Author: hustfxj): [~srowen] sorry, I didn't say it clearly. I means the spark application can't be done when it contains other unfinished non-daemon. Look at the follows example. the driver program should crash due to the exception. But In fact the driver program can't crash because the timer threads still are runnning. {code:title=Test.scala|borderStyle=solid} val sparkConf = new SparkConf().setAppName("NetworkWordCount") sparkConf.set("spark.streaming.blockInterval", "1000ms") val ssc = new StreamingContext(sparkConf, Seconds(10)) //non-daemon thread val scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( new ThreadFactory() { def newThread(r: Runnable): Thread = new Thread(r, "Driver-Commit-Thread") }) scheduledExecutorService.scheduleAtFixedRate( new Runnable() { def run() { try { System.out.println("runable") } catch { case e: Exception => { System.out.println("ScheduledTask persistAllConsumerOffset exception", e) } } } }, 1000, 1000 * 5, TimeUnit.MILLISECONDS) val lines = ssc.receiverStream(new WordReceiver(StorageLevel.MEMORY_AND_DISK_2)) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey((x: Int, y: Int) => x + y, 10) wordCounts.foreachRDD{rdd => rdd.collect().foreach(println) throw new RuntimeException } ssc.start() ssc.awaitTermination() {code} > Work should start driver, the same to AM of yarn > --------------------------------------------------- > > Key: SPARK-19264 > URL: https://issues.apache.org/jira/browse/SPARK-19264 > Project: Spark > Issue Type: Improvement > Reporter: hustfxj > > I think work can't start driver by "ProcessBuilderLike", thus we can't > know the application's main thread is finished or not if the application's > main thread contains some non-daemon threads. Because the program terminates > when there no longer is any non-daemon thread running (or someone called > System.exit). The main thread can have finished long ago. > worker should start driver like AM of YARN . As followed: > {code:title=ApplicationMaster.scala|borderStyle=solid} > mainMethod.invoke(null, userArgs.toArray) > finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) > logDebug("Done running users class") > {code} > Then the work can monitor the driver's main thread, and know the > application's state. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org