wForget commented on code in PR #6386:
URL: https://github.com/apache/kyuubi/pull/6386#discussion_r1597398933
##########
kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala:
##########
@@ -95,74 +95,139 @@ class YarnApplicationOperation extends
ApplicationOperation with Logging {
}
}
+ private def withNewYarnClient[T](
+ proxyUser: Option[String],
+ sessionConf: Option[KyuubiConf])(action: YarnClient => T): T = {
+ (sessionConf) match {
+ case Some(sessionConf) =>
+ (proxyUser) match {
+ case Some(user) =>
+ Utils.doAs(user) { () =>
+ var yarnClient: YarnClient = null
+ try {
+ val yarnConf =
KyuubiHadoopUtils.newYarnConfiguration(sessionConf)
+ yarnClient = createYarnClient(yarnConf)
+ action(yarnClient)
+ } finally {
+ Utils.tryLogNonFatalError(yarnClient.close())
+ }
+ }
+ case None =>
+ var yarnClient: YarnClient = null
+ try {
+ yarnClient = createAdminYarnClient(sessionConf).get
+ action(yarnClient)
+ } finally {
+ Utils.tryLogNonFatalError(yarnClient.close())
+ }
+ }
+ case None =>
+ withYarnClient(proxyUser)(action)
+ }
+ }
+
+ def createAdminYarnClient(conf: KyuubiConf): Option[YarnClient] = {
+
+ val yarnConf = KyuubiHadoopUtils.newYarnConfiguration(conf)
+
+ def createYarnClientWithCurrentUser(): Option[YarnClient] = {
+ val c = createYarnClient(yarnConf)
+ info(s"Creating admin YARN client with current user:
${Utils.currentUser}.")
+ Some(c);
+ }
+
+ def createYarnClientWithProxyUser(proxyUser: String): Option[YarnClient] =
+ Utils.doAs(proxyUser) { () =>
+ val c = createYarnClient(yarnConf)
+ info(s"Creating admin YARN client with proxy user: $proxyUser.")
+ Some(c);
+ }
+
+ YarnUserStrategy.withName(conf.get(KyuubiConf.YARN_USER_STRATEGY)) match {
+ case NONE =>
+ createYarnClientWithCurrentUser()
+ case ADMIN if conf.get(KyuubiConf.YARN_USER_ADMIN) == Utils.currentUser
=>
+ createYarnClientWithCurrentUser()
+ case ADMIN =>
+ createYarnClientWithProxyUser(conf.get(KyuubiConf.YARN_USER_ADMIN))
+ case OWNER =>
+ info("Skip initializing admin YARN client")
+ None
+ }
+ }
+
override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean =
appMgrInfo.resourceManager.exists(_.toLowerCase(Locale.ROOT).startsWith("yarn"))
override def killApplicationByTag(
+ sessionConf: Option[KyuubiConf],
appMgrInfo: ApplicationManagerInfo,
tag: String,
- proxyUser: Option[String] = None): KillResponse =
withYarnClient(proxyUser) { yarnClient =>
- try {
- val reports = yarnClient.getApplications(null, null, Set(tag).asJava)
- if (reports.isEmpty) {
- (false, NOT_FOUND)
- } else {
- try {
- val applicationId = reports.get(0).getApplicationId
- yarnClient.killApplication(applicationId)
- (true, s"Succeeded to terminate: $applicationId with $tag")
- } catch {
- case e: Exception =>
- (false, s"Failed to terminate application with $tag, due to
${e.getMessage}")
+ proxyUser: Option[String] = None): KillResponse =
+ withNewYarnClient(proxyUser, sessionConf) { yarnClient =>
Review Comment:
It seems that `newYarnClient` is not always needed. Can we use it only when
needed? Like, the `hadoop_conf_dir/yarn_conf_dir` engine evn exists in
`sessionConf`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]