My spark main thread create some daemon thread. Then the spark application
throw some exceptions, and the main thread will quit. But the jvm of driver
don't crash, so How can i do?
for example:
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
sparkConf.set("spark.streaming.blockInterval", "1000ms")
val ssc = new StreamingContext(sparkConf, Seconds(10))
//daemon thread
val scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
def newThread(r: Runnable): Thread = new Thread(r, "Driver-Commit-Thread")
})
scheduledExecutorService.scheduleAtFixedRate(
new Runnable() {
def run() {
try {
System.out.println("runable")
} catch {
case e: Exception => {
System.out.println("ScheduledTask persistAllConsumerOffset
exception", e)
}
}
}
}, 1000, 1000 * 5, TimeUnit.MILLISECONDS)
Thread.sleep(1005)
val lines = ssc.receiverStream(new WordReceiver(StorageLevel.MEMORY_AND_DISK_2))
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey((x: Int, y: Int) => x + y,
10)
wordCounts.foreachRDD{rdd =>
rdd.collect().foreach(println)
throw new RuntimeException
}
ssc.start()
try {
ssc.awaitTermination()
} catch {
case e: Exception => {
System.out.println("end!!!!!")
throw e
}
}