This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-1.9
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.9 by this push:
     new fc345fe2d8 [KYUUBI #7041] Fix NPE when getting metadtamanager in 
KubernetesApplicationOperation
fc345fe2d8 is described below

commit fc345fe2d89d87e68e7038dc70bf3d0a00db69e3
Author: Wang, Fei <[email protected]>
AuthorDate: Wed Apr 23 20:20:39 2025 -0700

    [KYUUBI #7041] Fix NPE when getting metadtamanager in 
KubernetesApplicationOperation
    
    ### Why are the changes needed?
    
    To fix NPE.
    
    Before, we use below method to get `metadataManager`.
    ```
    private def metadataManager = KyuubiServer.kyuubiServer.backendService
        .sessionManager.asInstanceOf[KyuubiSessionManager].metadataManager
    ```
    But before the kyuubi server fully restarted, the 
`KyuubiServer.kyuubiServer` is null and might throw NPE during batch recovery 
phase.
    
    For example:
    
    ```
    :2025-04-23 14:06:24.040 ERROR [KyuubiSessionManager-exec-pool: Thread-231] 
org.apache.kyuubi.engine.KubernetesApplicationOperation: Failed to get 
application by label: kyuubi-unique-tag=95116703-4240-4cc1-9886-ccae3a2ac879, 
due to Cannot invoke "org.apache.kyuubi.server.KyuubiServer.backendService()" 
because the return value of 
"org.apache.kyuubi.server.KyuubiServer$.kyuubiServer()" is null
    ```
    
    ### How was this patch tested?
    
    Existing GA.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #7041 from turboFei/fix_NPE.
    
    Closes #7041
    
    064d88707 [Wang, Fei] Fix NPE
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
    (cherry picked from commit ee677a6feb5cf18f27118a9c335d52af59a1ecb6)
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../scala/org/apache/kyuubi/engine/ApplicationOperation.scala    | 3 ++-
 .../scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala | 3 ++-
 .../apache/kyuubi/engine/KubernetesApplicationOperation.scala    | 9 ++++-----
 .../org/apache/kyuubi/engine/KyuubiApplicationManager.scala      | 6 ++++--
 .../org/apache/kyuubi/engine/YarnApplicationOperation.scala      | 3 ++-
 .../scala/org/apache/kyuubi/session/KyuubiSessionManager.scala   | 7 ++++---
 .../test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala    | 2 +-
 .../org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala  | 2 +-
 .../kyuubi/engine/KubernetesApplicationOperationSuite.scala      | 2 +-
 .../org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala     | 2 +-
 10 files changed, 22 insertions(+), 17 deletions(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
index 23a49c1ae5..b9bce2e851 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ApplicationOperation.scala
@@ -19,13 +19,14 @@ package org.apache.kyuubi.engine
 
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.engine.ApplicationState.ApplicationState
+import org.apache.kyuubi.server.metadata.MetadataManager
 
 trait ApplicationOperation {
 
   /**
    * Step for initializing the instance.
    */
-  def initialize(conf: KyuubiConf): Unit
+  def initialize(conf: KyuubiConf, metadataManager: Option[MetadataManager]): 
Unit
 
   /**
    * Step to clean up the instance
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
index 1d0d58d167..ffc233c016 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/JpsApplicationOperation.scala
@@ -22,13 +22,14 @@ import java.nio.file.Paths
 import scala.sys.process._
 
 import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.server.metadata.MetadataManager
 
 class JpsApplicationOperation extends ApplicationOperation {
   import ApplicationOperation._
 
   private var runner: String = _
 
-  override def initialize(conf: KyuubiConf): Unit = {
+  override def initialize(conf: KyuubiConf, metadataManager: 
Option[MetadataManager]): Unit = {
     val jps = sys.env.get("JAVA_HOME").orElse(sys.props.get("java.home"))
       .map(Paths.get(_, "bin", "jps").toString)
       .getOrElse("jps")
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
index ee5d06a3c5..81e97dddc6 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KubernetesApplicationOperation.scala
@@ -36,8 +36,7 @@ import 
org.apache.kyuubi.config.KyuubiConf.KubernetesApplicationStateSource.Kube
 import 
org.apache.kyuubi.config.KyuubiConf.KubernetesCleanupDriverPodStrategy.{ALL, 
COMPLETED, NONE}
 import org.apache.kyuubi.engine.ApplicationState.{isTerminated, 
ApplicationState, FAILED, FINISHED, KILLED, NOT_FOUND, PENDING, RUNNING, 
UNKNOWN}
 import org.apache.kyuubi.operation.OperationState
-import org.apache.kyuubi.server.KyuubiServer
-import org.apache.kyuubi.session.KyuubiSessionManager
+import org.apache.kyuubi.server.metadata.MetadataManager
 import org.apache.kyuubi.util.{KubernetesUtils, ThreadUtils}
 
 class KubernetesApplicationOperation extends ApplicationOperation with Logging 
{
@@ -77,8 +76,7 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
     kubernetesClients.computeIfAbsent(kubernetesInfo, kInfo => 
buildKubernetesClient(kInfo))
   }
 
-  private def metadataManager = KyuubiServer.kyuubiServer.backendService
-    .sessionManager.asInstanceOf[KyuubiSessionManager].metadataManager
+  private var metadataManager: Option[MetadataManager] = _
 
   // Visible for testing
   private[engine] def checkKubernetesInfo(kubernetesInfo: KubernetesInfo): 
Unit = {
@@ -113,8 +111,9 @@ class KubernetesApplicationOperation extends 
ApplicationOperation with Logging {
     }
   }
 
-  override def initialize(conf: KyuubiConf): Unit = {
+  override def initialize(conf: KyuubiConf, metadataManager: 
Option[MetadataManager]): Unit = {
     kyuubiConf = conf
+    this.metadataManager = metadataManager
     info("Start initializing Kubernetes application operation.")
     submitTimeout = conf.get(KyuubiConf.ENGINE_KUBERNETES_SUBMIT_TIMEOUT)
     // Defer cleaning terminated application information
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
index f0965a05aa..cd2101baf8 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -29,10 +29,12 @@ import org.apache.kyuubi.config.KyuubiConf
 import 
org.apache.kyuubi.engine.KubernetesApplicationOperation.LABEL_KYUUBI_UNIQUE_KEY
 import org.apache.kyuubi.engine.flink.FlinkProcessBuilder
 import org.apache.kyuubi.engine.spark.SparkProcessBuilder
+import org.apache.kyuubi.server.metadata.MetadataManager
 import org.apache.kyuubi.service.AbstractService
 import org.apache.kyuubi.util.reflect.ReflectUtils._
 
-class KyuubiApplicationManager extends 
AbstractService("KyuubiApplicationManager") {
+class KyuubiApplicationManager(metadataManager: Option[MetadataManager])
+  extends AbstractService("KyuubiApplicationManager") {
 
   // TODO: maybe add a configuration is better
   private val operations =
@@ -41,7 +43,7 @@ class KyuubiApplicationManager extends 
AbstractService("KyuubiApplicationManager
   override def initialize(conf: KyuubiConf): Unit = {
     operations.foreach { op =>
       try {
-        op.initialize(conf)
+        op.initialize(conf, metadataManager)
       } catch {
         case NonFatal(e) => warn(s"Error starting 
${op.getClass.getSimpleName}: ${e.getMessage}")
       }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
index 9d6ca32fab..39ee294873 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/YarnApplicationOperation.scala
@@ -32,6 +32,7 @@ import org.apache.kyuubi.config.KyuubiConf.YarnUserStrategy._
 import org.apache.kyuubi.engine.ApplicationOperation._
 import org.apache.kyuubi.engine.ApplicationState.ApplicationState
 import org.apache.kyuubi.engine.YarnApplicationOperation.toApplicationState
+import org.apache.kyuubi.server.metadata.MetadataManager
 import org.apache.kyuubi.util.KyuubiHadoopUtils
 
 class YarnApplicationOperation extends ApplicationOperation with Logging {
@@ -40,7 +41,7 @@ class YarnApplicationOperation extends ApplicationOperation 
with Logging {
   @volatile private var adminYarnClient: Option[YarnClient] = None
   private var submitTimeout: Long = _
 
-  override def initialize(conf: KyuubiConf): Unit = {
+  override def initialize(conf: KyuubiConf, metadataManager: 
Option[MetadataManager]): Unit = {
     submitTimeout = conf.get(KyuubiConf.ENGINE_YARN_SUBMIT_TIMEOUT)
     yarnConf = KyuubiHadoopUtils.newYarnConfiguration(conf)
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index 9edc8218eb..13869051a5 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -51,12 +51,11 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
 
   val operationManager = new KyuubiOperationManager()
   val credentialsManager = new HadoopCredentialsManager()
-  val applicationManager = new KyuubiApplicationManager()
 
   // Currently, the metadata manager is used by the REST frontend which 
provides batch job APIs,
   // so we initialize it only when Kyuubi starts with the REST frontend.
-  lazy val metadataManager: Option[MetadataManager] =
-    if (conf.isRESTEnabled) Some(new MetadataManager()) else None
+  var metadataManager: Option[MetadataManager] = None
+  var applicationManager: KyuubiApplicationManager = _
 
   // lazy is required for plugins since the conf is null when this class 
initialization
   lazy val sessionConfAdvisor: Seq[SessionConfAdvisor] = 
PluginLoader.loadSessionConfAdvisor(conf)
@@ -73,6 +72,8 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
 
   override def initialize(conf: KyuubiConf): Unit = {
     this.conf = conf
+    if (conf.isRESTEnabled) metadataManager = Some(new MetadataManager())
+    applicationManager = new KyuubiApplicationManager(metadataManager)
     addService(applicationManager)
     addService(credentialsManager)
     metadataManager.foreach(addService)
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
index 8d3f7b17d8..eef2a024b1 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithKyuubiServerOnYarn.scala
@@ -42,7 +42,7 @@ sealed trait WithKyuubiServerOnYarn extends WithKyuubiServer {
 
   protected lazy val yarnOperation: YarnApplicationOperation = {
     val operation = new YarnApplicationOperation()
-    operation.initialize(miniYarnService.getConf)
+    operation.initialize(miniYarnService.getConf, None)
     operation
   }
 
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
index b667d9c39e..c1e9a25c80 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/JpsApplicationOperationSuite.scala
@@ -35,7 +35,7 @@ import org.apache.kyuubi.util.reflect.ReflectUtils._
 class JpsApplicationOperationSuite extends KyuubiFunSuite {
   private val jps = loadFromServiceLoader[ApplicationOperation]()
     .find(_.getClass.isAssignableFrom(classOf[JpsApplicationOperation])).get
-  jps.initialize(null)
+  jps.initialize(null, None)
 
   test("JpsApplicationOperation with jstat") {
     assert(jps.isSupported(ApplicationManagerInfo(None)))
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala
index 2ea1939d2f..7454ad6529 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/KubernetesApplicationOperationSuite.scala
@@ -28,7 +28,7 @@ class KubernetesApplicationOperationSuite extends 
KyuubiFunSuite {
     kyuubiConf.set(KyuubiConf.KUBERNETES_NAMESPACE_ALLOW_LIST.key, "ns1,ns2")
 
     val operation = new KubernetesApplicationOperation()
-    operation.initialize(kyuubiConf)
+    operation.initialize(kyuubiConf, None)
 
     operation.checkKubernetesInfo(KubernetesInfo(None, None))
     operation.checkKubernetesInfo(KubernetesInfo(Some("1"), None))
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
index bc53563f78..cd0e0d6c70 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/AdminResourceSuite.scala
@@ -49,7 +49,7 @@ import 
org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion.HIVE_CL
 
 class AdminResourceSuite extends KyuubiFunSuite with RestFrontendTestHelper {
 
-  private val engineMgr = new KyuubiApplicationManager()
+  private val engineMgr = new KyuubiApplicationManager(None)
 
   override protected lazy val conf: KyuubiConf = KyuubiConf()
     .set(AUTHENTICATION_METHOD, Seq("CUSTOM"))

Reply via email to