This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.9 by this push:
new fc345fe2d8 [KYUUBI #7041] Fix NPE when getting metadtamanager in
KubernetesApplicationOperation
fc345fe2d8 is described below
commit fc345fe2d89d87e68e7038dc70bf3d0a00db69e3
Author: Wang, Fei <[email protected]>
AuthorDate: Wed Apr 23 20:20:39 2025 -0700
[KYUUBI #7041] Fix NPE when getting metadtamanager in
KubernetesApplicationOperation
### Why are the changes needed?
To fix NPE.
Before, we use below method to get `metadataManager`.
```
private def metadataManager = KyuubiServer.kyuubiServer.backendService
.sessionManager.asInstanceOf[KyuubiSessionManager].metadataManager
```
But before the kyuubi server fully restarted, the
`KyuubiServer.kyuubiServer` is null and might throw NPE during batch recovery
phase.
For example:
```
:2025-04-23 14:06:24.040 ERROR [KyuubiSessionManager-exec-pool: Thread-231]
org.apache.kyuubi.engine.KubernetesApplicationOperation: Failed to get
application by label: kyuubi-unique-tag=95116703-4240-4cc1-9886-ccae3a2ac879,
due to Cannot invoke "org.apache.kyuubi.server.KyuubiServer.backendService()"
because the return value of
"org.apache.kyuubi.server.KyuubiServer$.kyuubiServer()" is null
```
### How was this patch tested?
Existing GA.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #7041 from turboFei/fix_NPE.
Closes #7041
064d88707 [Wang, Fei] Fix NPE
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
(cherry picked from commit ee677a6feb5cf18f27118a9c335d52af59a1ecb6)
Signed-off-by: Wang, Fei <[email protected]>
---
.../scala/org/apache/kyuubi/engine/ApplicationOperation.scala | 3 ++-
.../scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala | 3 ++-
.../apache/kyuubi/engine/KubernetesApplicationOperation.scala | 9 ++++-----
.../org/apache/kyuubi/engine/KyuubiApplicationManager.scala | 6 ++++--
.../org/apache/kyuubi/engine/YarnApplicationOperation.scala | 3 ++-
.../scala/org/apache/kyuubi/session/KyuubiSessionManager.scala | 7 ++++---
.../test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala | 2 +-
.../org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala | 2 +-
.../kyuubi/engine/KubernetesApplicationOperationSuite.scala | 2 +-
.../org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala | 2 +-
10 files changed, 22 insertions(+), 17 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
index 23a49c1ae5..b9bce2e851 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
@@ -19,13 +19,14 @@ package org.apache.kyuubi.engine
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.ApplicationState.ApplicationState
+import org.apache.kyuubi.server.metadata.MetadataManager
trait ApplicationOperation {
/**
* Step for initializing the instance.
*/
- def initialize(conf: KyuubiConf): Unit
+ def initialize(conf: KyuubiConf, metadataManager: Option[MetadataManager]):
Unit
/**
* Step to clean up the instance
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
index 1d0d58d167..ffc233c016 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
@@ -22,13 +22,14 @@ import java.nio.file.Paths
import scala.sys.process._
import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.server.metadata.MetadataManager
class JpsApplicationOperation extends ApplicationOperation {
import ApplicationOperation._
private var runner: String = _
- override def initialize(conf: KyuubiConf): Unit = {
+ override def initialize(conf: KyuubiConf, metadataManager:
Option[MetadataManager]): Unit = {
val jps = sys.env.get("JAVA_HOME").orElse(sys.props.get("java.home"))
.map(Paths.get(_, "bin", "jps").toString)
.getOrElse("jps")
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
index ee5d06a3c5..81e97dddc6 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
@@ -36,8 +36,7 @@ import
org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.Kube
import
org.apache.kyuubi.config.KyuubiConf.KubernetesCleanupDriverPodStrategy.{ALL,
COMPLETED, NONE}
import org.apache.kyuubi.engine.ApplicationState.{isTerminated,
ApplicationState, FAILED, FINISHED, KILLED, NOT_FOUND, PENDING, RUNNING,
UNKNOWN}
import org.apache.kyuubi.operation.OperationState
-import org.apache.kyuubi.server.KyuubiServer
-import org.apache.kyuubi.session.KyuubiSessionManager
+import org.apache.kyuubi.server.metadata.MetadataManager
import org.apache.kyuubi.util.{KubernetesUtils, ThreadUtils}
class KubernetesApplicationOperation extends ApplicationOperation with Logging
{
@@ -77,8 +76,7 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
kubernetesClients.computeIfAbsent(kubernetesInfo, kInfo =>
buildKubernetesClient(kInfo))
}
- private def metadataManager = KyuubiServer.kyuubiServer.backendService
- .sessionManager.asInstanceOf[KyuubiSessionManager].metadataManager
+ private var metadataManager: Option[MetadataManager] = _
// Visible for testing
private[engine] def checkKubernetesInfo(kubernetesInfo: KubernetesInfo):
Unit = {
@@ -113,8 +111,9 @@ class KubernetesApplicationOperation extends
ApplicationOperation with Logging {
}
}
- override def initialize(conf: KyuubiConf): Unit = {
+ override def initialize(conf: KyuubiConf, metadataManager:
Option[MetadataManager]): Unit = {
kyuubiConf = conf
+ this.metadataManager = metadataManager
info("Start initializing Kubernetes application operation.")
submitTimeout = conf.get(KyuubiConf.ENGINE_KUBERNETES_SUBMIT_TIMEOUT)
// Defer cleaning terminated application information
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
index f0965a05aa..cd2101baf8 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -29,10 +29,12 @@ import org.apache.kyuubi.config.KyuubiConf
import
org.apache.kyuubi.engine.KubernetesApplicationOperation.LABEL_KYUUBI_UNIQUE_KEY
import org.apache.kyuubi.engine.flink.FlinkProcessBuilder
import org.apache.kyuubi.engine.spark.SparkProcessBuilder
+import org.apache.kyuubi.server.metadata.MetadataManager
import org.apache.kyuubi.service.AbstractService
import org.apache.kyuubi.util.reflect.ReflectUtils._
-class KyuubiApplicationManager extends
AbstractService("KyuubiApplicationManager") {
+class KyuubiApplicationManager(metadataManager: Option[MetadataManager])
+ extends AbstractService("KyuubiApplicationManager") {
// TODO: maybe add a configuration is better
private val operations =
@@ -41,7 +43,7 @@ class KyuubiApplicationManager extends
AbstractService("KyuubiApplicationManager
override def initialize(conf: KyuubiConf): Unit = {
operations.foreach { op =>
try {
- op.initialize(conf)
+ op.initialize(conf, metadataManager)
} catch {
case NonFatal(e) => warn(s"Error starting
${op.getClass.getSimpleName}: ${e.getMessage}")
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
index 9d6ca32fab..39ee294873 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
@@ -32,6 +32,7 @@ import org.apache.kyuubi.config.KyuubiConf.YarnUserStrategy._
import org.apache.kyuubi.engine.ApplicationOperation._
import org.apache.kyuubi.engine.ApplicationState.ApplicationState
import org.apache.kyuubi.engine.YarnApplicationOperation.toApplicationState
+import org.apache.kyuubi.server.metadata.MetadataManager
import org.apache.kyuubi.util.KyuubiHadoopUtils
class YarnApplicationOperation extends ApplicationOperation with Logging {
@@ -40,7 +41,7 @@ class YarnApplicationOperation extends ApplicationOperation
with Logging {
@volatile private var adminYarnClient: Option[YarnClient] = None
private var submitTimeout: Long = _
- override def initialize(conf: KyuubiConf): Unit = {
+ override def initialize(conf: KyuubiConf, metadataManager:
Option[MetadataManager]): Unit = {
submitTimeout = conf.get(KyuubiConf.ENGINE_YARN_SUBMIT_TIMEOUT)
yarnConf = KyuubiHadoopUtils.newYarnConfiguration(conf)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 9edc8218eb..13869051a5 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -51,12 +51,11 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
val operationManager = new KyuubiOperationManager()
val credentialsManager = new HadoopCredentialsManager()
- val applicationManager = new KyuubiApplicationManager()
// Currently, the metadata manager is used by the REST frontend which
provides batch job APIs,
// so we initialize it only when Kyuubi starts with the REST frontend.
- lazy val metadataManager: Option[MetadataManager] =
- if (conf.isRESTEnabled) Some(new MetadataManager()) else None
+ var metadataManager: Option[MetadataManager] = None
+ var applicationManager: KyuubiApplicationManager = _
// lazy is required for plugins since the conf is null when this class
initialization
lazy val sessionConfAdvisor: Seq[SessionConfAdvisor] =
PluginLoader.loadSessionConfAdvisor(conf)
@@ -73,6 +72,8 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
override def initialize(conf: KyuubiConf): Unit = {
this.conf = conf
+ if (conf.isRESTEnabled) metadataManager = Some(new MetadataManager())
+ applicationManager = new KyuubiApplicationManager(metadataManager)
addService(applicationManager)
addService(credentialsManager)
metadataManager.foreach(addService)
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
index 8d3f7b17d8..eef2a024b1 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
@@ -42,7 +42,7 @@ sealed trait WithKyuubiServerOnYarn extends WithKyuubiServer {
protected lazy val yarnOperation: YarnApplicationOperation = {
val operation = new YarnApplicationOperation()
- operation.initialize(miniYarnService.getConf)
+ operation.initialize(miniYarnService.getConf, None)
operation
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
index b667d9c39e..c1e9a25c80 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
@@ -35,7 +35,7 @@ import org.apache.kyuubi.util.reflect.ReflectUtils._
class JpsApplicationOperationSuite extends KyuubiFunSuite {
private val jps = loadFromServiceLoader[ApplicationOperation]()
.find(_.getClass.isAssignableFrom(classOf[JpsApplicationOperation])).get
- jps.initialize(null)
+ jps.initialize(null, None)
test("JpsApplicationOperation with jstat") {
assert(jps.isSupported(ApplicationManagerInfo(None)))
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala
index 2ea1939d2f..7454ad6529 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala
@@ -28,7 +28,7 @@ class KubernetesApplicationOperationSuite extends
KyuubiFunSuite {
kyuubiConf.set(KyuubiConf.KUBERNETES_NAMESPACE_ALLOW_LIST.key, "ns1,ns2")
val operation = new KubernetesApplicationOperation()
- operation.initialize(kyuubiConf)
+ operation.initialize(kyuubiConf, None)
operation.checkKubernetesInfo(KubernetesInfo(None, None))
operation.checkKubernetesInfo(KubernetesInfo(Some("1"), None))
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
index bc53563f78..cd0e0d6c70 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
@@ -49,7 +49,7 @@ import
org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion.HIVE_CL
class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
- private val engineMgr = new KyuubiApplicationManager()
+ private val engineMgr = new KyuubiApplicationManager(None)
override protected lazy val conf: KyuubiConf = KyuubiConf()
.set(AUTHENTICATION_METHOD, Seq("CUSTOM"))