yiheng commented on a change in pull request #242: [LIVY-336][SERVER] Livy should not spawn one thread per job to track the job on Yarn URL: https://github.com/apache/incubator-livy/pull/242#discussion_r351092771
########## File path: server/src/main/scala/org/apache/livy/utils/SparkYarnApp.scala ########## @@ -254,50 +268,45 @@ class SparkYarnApp private[utils] ( } } - // Exposed for unit test. - // TODO Instead of spawning a thread for every session, create a centralized thread and - // batch YARN queries. - private[utils] val yarnAppMonitorThread = Utils.startDaemonThread(s"yarnAppMonitorThread-$this") { + private var yarnTagToAppIdFailedTimes: Int = _ + + private def failToMonitor(): Unit = { + changeState(SparkApp.State.FAILED) + process.foreach(_.destroy()) + leakedAppTags.put(appTag, System.currentTimeMillis()) + } + + private def monitorSparkYarnApp(): Unit = { try { + if (killed) { + changeState(SparkApp.State.KILLED) + } else if (isProcessErrExit()) { + changeState(SparkApp.State.FAILED) + } // If appId is not known, query YARN by appTag to get it. - val appId = try { - appIdOption.map(ConverterUtils.toApplicationId).getOrElse { - val pollInterval = getYarnPollInterval(livyConf) - val deadline = getYarnTagToAppIdTimeout(livyConf).fromNow - getAppIdFromTag(appTag, pollInterval, deadline) + if (appId.isEmpty) { + appId = getAppId() + if (appId.isEmpty) { + throw new IllegalStateException(s"No YARN application is found with tag " + Review comment: instead of throwing an exception, can we move the exception handle code here? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services