[ https://issues.apache.org/jira/browse/SPARK-19264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15827181#comment-15827181 ]
hustfxj edited comment on SPARK-19264 at 1/18/17 1:18 AM: ---------------------------------------------------------- [~srowen] sorry, I didn't say it clearly. I means the driver can't be done when it contains other unfinished non-daemon threads. Look at the followed example. the driver program should crash due to the exception. But In fact the driver program can't crash because the timer threads still are runnning. {code:title=Test.scala|borderStyle=solid} val sparkConf = new SparkConf().setAppName("NetworkWordCount") sparkConf.set("spark.streaming.blockInterval", "1000ms") val ssc = new StreamingContext(sparkConf, Seconds(10)) //non-daemon thread val scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( new ThreadFactory() { def newThread(r: Runnable): Thread = new Thread(r, "Driver-Commit-Thread") }) scheduledExecutorService.scheduleAtFixedRate( new Runnable() { def run() { try { System.out.println("runable") } catch { case e: Exception => { System.out.println("ScheduledTask persistAllConsumerOffset exception", e) } } } }, 1000, 1000 * 5, TimeUnit.MILLISECONDS) val lines = ssc.receiverStream(new WordReceiver(StorageLevel.MEMORY_AND_DISK_2)) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey((x: Int, y: Int) => x + y, 10) wordCounts.foreachRDD{rdd => rdd.collect().foreach(println) throw new RuntimeException } ssc.start() ssc.awaitTermination() {code} was (Author: hustfxj): [~srowen] sorry, I didn't say it clearly. I means the spark application can't be done when it contains other unfinished non-daemon. Look at the followed example. the driver program should crash due to the exception. But In fact the driver program can't crash because the timer threads still are runnning. {code:title=Test.scala|borderStyle=solid} val sparkConf = new SparkConf().setAppName("NetworkWordCount") sparkConf.set("spark.streaming.blockInterval", "1000ms") val ssc = new StreamingContext(sparkConf, Seconds(10)) //non-daemon thread val scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( new ThreadFactory() { def newThread(r: Runnable): Thread = new Thread(r, "Driver-Commit-Thread") }) scheduledExecutorService.scheduleAtFixedRate( new Runnable() { def run() { try { System.out.println("runable") } catch { case e: Exception => { System.out.println("ScheduledTask persistAllConsumerOffset exception", e) } } } }, 1000, 1000 * 5, TimeUnit.MILLISECONDS) val lines = ssc.receiverStream(new WordReceiver(StorageLevel.MEMORY_AND_DISK_2)) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey((x: Int, y: Int) => x + y, 10) wordCounts.foreachRDD{rdd => rdd.collect().foreach(println) throw new RuntimeException } ssc.start() ssc.awaitTermination() {code} > Work should start driver, the same to AM of yarn > --------------------------------------------------- > > Key: SPARK-19264 > URL: https://issues.apache.org/jira/browse/SPARK-19264 > Project: Spark > Issue Type: Improvement > Reporter: hustfxj > > I think work can't start driver by "ProcessBuilderLike", thus we can't > know the application's main thread is finished or not if the application's > main thread contains some non-daemon threads. Because the program terminates > when there no longer is any non-daemon thread running (or someone called > System.exit). The main thread can have finished long ago. > worker should start driver like AM of YARN . As followed: > {code:title=ApplicationMaster.scala|borderStyle=solid} > mainMethod.invoke(null, userArgs.toArray) > finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) > logDebug("Done running users class") > {code} > Then the work can monitor the driver's main thread, and know the > application's state. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org