This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-1.10
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.10 by this push:
new 1c7113b1aa [KYUUBI #7027] Support to initialize kubernetes clients on
kyuubi server startup
1c7113b1aa is described below
commit 1c7113b1aae7eeaab08a0724ed4071ccbc3cd007
Author: Wang, Fei <[email protected]>
AuthorDate: Tue Apr 15 22:36:16 2025 -0700
[KYUUBI #7027] Support to initialize kubernetes clients on kyuubi server
startup
### Why are the changes needed?
This ensure the Kyuubi server is promptly informed for any Kubernetes
resource changes after startup. It is highly recommend to set it for multiple
Kyuubi instances mode.
### How was this patch tested?
Existing GA and Integration testing.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #7027 from turboFei/k8s_client_init.
Closes #7027
393b9960a [Wang, Fei] server only
a640278c4 [Wang, Fei] refresh
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
(cherry picked from commit 4fc201e85dd37dfbc032f7b30d5e38d9237db282)
Signed-off-by: Wang, Fei <[email protected]>
---
docs/configuration/settings.md | 1 +
.../org/apache/kyuubi/config/KyuubiConf.scala | 12 +++++++++++
.../engine/KubernetesApplicationOperation.scala | 23 ++++++++++++++++++++++
.../kyuubi/engine/KyuubiApplicationManager.scala | 5 +++++
.../org/apache/kyuubi/server/KyuubiServer.scala | 2 ++
.../KubernetesApplicationOperationSuite.scala | 17 ++++++++++++++++
6 files changed, 60 insertions(+)
diff --git a/docs/configuration/settings.md b/docs/configuration/settings.md
index 02902b97df..b143bae04d 100644
--- a/docs/configuration/settings.md
+++ b/docs/configuration/settings.md
@@ -356,6 +356,7 @@ You can configure the Kyuubi properties in
`$KYUUBI_HOME/conf/kyuubi-defaults.co
| kyuubi.kubernetes.authenticate.clientKeyFile |
<undefined> |
Path to the client key file for connecting to the Kubernetes API server over
TLS from the kyuubi. Specify this as a path as opposed to a URI (i.e. do not
provide a scheme)
[...]
| kyuubi.kubernetes.authenticate.oauthToken |
<undefined> |
The OAuth token to use when authenticating against the Kubernetes API server.
Note that unlike, the other authentication options, this must be the exact
string value of the token to use for the authentication.
[...]
| kyuubi.kubernetes.authenticate.oauthTokenFile |
<undefined> |
Path to the file containing the OAuth token to use when authenticating against
the Kubernetes API server. Specify this as a path as opposed to a URI (i.e. do
not provide a scheme)
[...]
+| kyuubi.kubernetes.client.initialize.list
|| The
kubernetes client initialize list to register kubernetes resource informers
during Kyuubi server startup. This ensure the Kyuubi server is promptly
informed for any Kubernetes resource changes after startup. It is highly
recommend to set it for multiple Kyuubi instances mode. The format is
`context1:namespace1,context2:namespace2`. [...]
| kyuubi.kubernetes.context |
<undefined> |
The desired context from your kubernetes config file used to configure the K8s
client for interacting with the cluster.
[...]
| kyuubi.kubernetes.context.allow.list
|| The
allowed kubernetes context list, if it is empty, there is no kubernetes context
limitation.
[...]
| kyuubi.kubernetes.master.address |
<undefined> |
The internal Kubernetes master (API server) address to be used for kyuubi.
[...]
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
index 6588b3fb87..38a6384976 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala
@@ -1236,6 +1236,18 @@ object KyuubiConf {
.toSet()
.createWithDefault(Set.empty)
+ val KUBERNETES_CLIENT_INITIALIZE_LIST: ConfigEntry[Seq[String]] =
+ buildConf("kyuubi.kubernetes.client.initialize.list")
+ .doc("The kubernetes client initialize list to register kubernetes
resource informers" +
+ " during Kyuubi server startup. This ensure the Kyuubi server is
promptly informed for" +
+ " any Kubernetes resource changes after startup. It is highly
recommend to set it for" +
+ " multiple Kyuubi instances mode. The format is
`context1:namespace1,context2:namespace2`.")
+ .version("1.11.0")
+ .serverOnly
+ .stringConf
+ .toSequence()
+ .createWithDefault(Nil)
+
val KUBERNETES_MASTER: OptionalConfigEntry[String] =
buildConf("kyuubi.kubernetes.master.address")
.doc("The internal Kubernetes master (API server) address to be used for
kyuubi.")
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
index 59faee4868..55f0fa6613 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
@@ -167,6 +167,29 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
TimeUnit.MILLISECONDS)
cleanupCanceledAppPodExecutor = ThreadUtils.newDaemonCachedThreadPool(
"cleanup-canceled-app-pod-thread")
+ initializeKubernetesClient(kyuubiConf)
+ }
+
+ private[kyuubi] def getKubernetesClientInitializeInfo(
+ kyuubiConf: KyuubiConf): Seq[KubernetesInfo] = {
+ kyuubiConf.get(KyuubiConf.KUBERNETES_CLIENT_INITIALIZE_LIST).map { init =>
+ val (context, namespace) = init.split(":") match {
+ case Array(ctx, ns) => (Some(ctx).filterNot(_.isEmpty),
Some(ns).filterNot(_.isEmpty))
+ case Array(ctx) => (Some(ctx).filterNot(_.isEmpty), None)
+ case _ => (None, None)
+ }
+ KubernetesInfo(context, namespace)
+ }
+ }
+
+ private[kyuubi] def initializeKubernetesClient(kyuubiConf: KyuubiConf): Unit
= {
+ getKubernetesClientInitializeInfo(kyuubiConf).foreach { kubernetesInfo =>
+ try {
+ getOrCreateKubernetesClient(kubernetesInfo)
+ } catch {
+ case e: Throwable => error(s"Failed to initialize Kubernetes client
for $kubernetesInfo", e)
+ }
+ }
}
override def isSupported(appMgrInfo: ApplicationManagerInfo): Boolean = {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
index a80f13a86c..b4095c424f 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -96,6 +96,11 @@ class KyuubiApplicationManager extends
AbstractService("KyuubiApplicationManager
case None => None
}
}
+
+ private[kyuubi] def getKubernetesApplicationOperation:
Option[KubernetesApplicationOperation] = {
+ operations.find(_.isInstanceOf[KubernetesApplicationOperation])
+ .map(_.asInstanceOf[KubernetesApplicationOperation])
+ }
}
object KyuubiApplicationManager {
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
index 338ac6b414..0996472653 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
@@ -124,6 +124,8 @@ object KyuubiServer extends Logging {
val refreshedKubernetesConf =
createKyuubiConf().getAll.filter(_._1.startsWith(KYUUBI_KUBERNETES_CONF_PREFIX))
refreshConfig("kubernetes", existedKubernetesConf, refreshedKubernetesConf)
+
kyuubiServer.backendService.sessionManager.asInstanceOf[KyuubiSessionManager].applicationManager
+
.getKubernetesApplicationOperation.foreach(_.initializeKubernetesClient(kyuubiServer.conf))
}
private def refreshConfig(
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala
index ab663a0074..f6bfc409db 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala
@@ -96,4 +96,21 @@ class KubernetesApplicationOperationSuite extends
KyuubiFunSuite {
sparkUiPort) ===
s"http://$sparkDriverSvc.$kubernetesNamespace.svc.$kubernetesContext.k8s.io:$sparkUiPort")
}
+
+ test("get kubernetes client initialization info") {
+ val kyuubiConf = KyuubiConf()
+ kyuubiConf.set(
+ KyuubiConf.KUBERNETES_CLIENT_INITIALIZE_LIST.key,
+ "c1:ns1,c1:ns2,c2:ns1,c2:ns2,c1:,:ns1")
+
+ val operation = new KubernetesApplicationOperation()
+ assert(operation.getKubernetesClientInitializeInfo(kyuubiConf) ===
+ Array(
+ KubernetesInfo(Some("c1"), Some("ns1")),
+ KubernetesInfo(Some("c1"), Some("ns2")),
+ KubernetesInfo(Some("c2"), Some("ns1")),
+ KubernetesInfo(Some("c2"), Some("ns2")),
+ KubernetesInfo(Some("c1"), None),
+ KubernetesInfo(None, Some("ns1"))))
+ }
}