[ 
https://issues.apache.org/jira/browse/SPARK-19264?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15827181#comment-15827181
 ] 

hustfxj edited comment on SPARK-19264 at 1/18/17 1:18 AM:
----------------------------------------------------------

[~srowen] sorry, I didn't say it clearly. I means the driver can't be done when 
it contains other unfinished non-daemon threads. Look at the followed example. 
the driver program should crash due to the exception. But In fact the driver 
program can't crash because the timer threads still are runnning.
{code:title=Test.scala|borderStyle=solid}   
   val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    sparkConf.set("spark.streaming.blockInterval", "1000ms")
    val ssc = new StreamingContext(sparkConf, Seconds(10))
    //non-daemon thread
    val scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
      new ThreadFactory() {
        def newThread(r: Runnable): Thread = new Thread(r, 
"Driver-Commit-Thread")
      })
    scheduledExecutorService.scheduleAtFixedRate(
      new Runnable() {
        def run() {
          try {
            System.out.println("runable")
          } catch {
            case e: Exception => {
              System.out.println("ScheduledTask persistAllConsumerOffset 
exception", e)
            }
          }
        }
      }, 1000, 1000 * 5, TimeUnit.MILLISECONDS)
    val lines = ssc.receiverStream(new 
WordReceiver(StorageLevel.MEMORY_AND_DISK_2))
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey((x: Int, y: Int) => x + 
y, 10)
    wordCounts.foreachRDD{rdd =>
      rdd.collect().foreach(println)
      throw new RuntimeException
    }
    ssc.start()
    ssc.awaitTermination()
{code}



was (Author: hustfxj):
[~srowen] sorry, I didn't say it clearly. I means the spark application can't 
be done when it contains other unfinished non-daemon. Look at the followed 
example. the driver program should crash due to the exception. But In fact the 
driver program can't crash because the timer threads still are runnning.
{code:title=Test.scala|borderStyle=solid}   
   val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    sparkConf.set("spark.streaming.blockInterval", "1000ms")
    val ssc = new StreamingContext(sparkConf, Seconds(10))
    //non-daemon thread
    val scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
      new ThreadFactory() {
        def newThread(r: Runnable): Thread = new Thread(r, 
"Driver-Commit-Thread")
      })
    scheduledExecutorService.scheduleAtFixedRate(
      new Runnable() {
        def run() {
          try {
            System.out.println("runable")
          } catch {
            case e: Exception => {
              System.out.println("ScheduledTask persistAllConsumerOffset 
exception", e)
            }
          }
        }
      }, 1000, 1000 * 5, TimeUnit.MILLISECONDS)
    val lines = ssc.receiverStream(new 
WordReceiver(StorageLevel.MEMORY_AND_DISK_2))
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey((x: Int, y: Int) => x + 
y, 10)
    wordCounts.foreachRDD{rdd =>
      rdd.collect().foreach(println)
      throw new RuntimeException
    }
    ssc.start()
    ssc.awaitTermination()
{code}


> Work should start driver, the same to  AM  of yarn 
> ---------------------------------------------------
>
>                 Key: SPARK-19264
>                 URL: https://issues.apache.org/jira/browse/SPARK-19264
>             Project: Spark
>          Issue Type: Improvement
>            Reporter: hustfxj
>
>   I think work can't start driver by "ProcessBuilderLike",  thus we can't 
> know the application's main thread is finished or not if the application's 
> main thread contains some non-daemon threads. Because the program terminates 
> when there no longer is any non-daemon thread running (or someone called 
> System.exit). The main thread can have finished long ago. 
>     worker should  start driver like AM of YARN . As followed:
> {code:title=ApplicationMaster.scala|borderStyle=solid}    
>      mainMethod.invoke(null, userArgs.toArray)
>      finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
>      logDebug("Done running users class")
> {code}
> Then the work can monitor the driver's main thread, and know the 
> application's state. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to