This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c71415f99 [KYUUBI #6646] Fix RESTful API NPE when restarting Kyuubi 
server
4c71415f99 is described below

commit 4c71415f99653b41f15e4f8a37ae31e096e84558
Author: Wang, Fei <[email protected]>
AuthorDate: Thu Aug 29 14:21:40 2024 -0700

    [KYUUBI #6646] Fix RESTful API NPE when restarting Kyuubi server
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    We meet issue when restarting the kyuubi server:
    ```
    2024-08-28 09:48:41.507 WARN 
org.apache.kyuubi.server.api.RestExceptionMapper: Error occurs on accessing 
REST API.
    java.lang.NullPointerException: Cannot invoke 
"org.apache.kyuubi.server.KyuubiServer.getConf()" because the return value of 
"org.apache.kyuubi.server.KyuubiServer$.kyuubiServer()" is null
            at 
org.apache.kyuubi.server.api.v1.BatchesResource.batchV2Enabled(BatchesResource.scala:72)
 ~[kyuubi-server_2.12-1.10.0.0.1.0.jar:1.10.0.0.1.0]
            at 
org.apache.kyuubi.server.api.v1.BatchesResource.$anonfun$batchInfo$3(BatchesResource.scala:345)
 ~[kyuubi-server_2.12-1.10.0.0.1.0.jar:1.10.0.0.1.0]
            at scala.Option.map(Option.scala:230) ~[scala-library-2.12.18.jar:?]
    ```
    
    The root cause is that, the `KyuubiServer.kyuubiServer` is null until the 
kyuubi server start finished.
    
    
https://github.com/apache/kyuubi/blob/db57e9365d7933942197936ac7ff711d58f7ea91/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala#L231-L234
    
    But the RESTful api initialization finished before than that, so, NPE will 
be thrown between REST front started to all the kyuubi service started.
    ## Describe Your Solution ๐Ÿ”ง
    
    Please include a summary of the change and which issue is fixed. Please 
also include relevant motivation and context. List any dependencies that are 
required for this change.
    
    In this PR,
    1. move the KyuubiBatchService into KyuubiRestFrontendService
    2. move the TempFileService (introduced in #6587) into KyuubiSessionManager
    
    <img width="872" alt="image" 
src="https://github.com/user-attachments/assets/a4c5fe38-1d34-4fee-933f-72511bc06f27";>
    
    ## Types of changes :bookmark:
    
    - [x] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [ ] Breaking change (fix or feature that would cause existing 
functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    #### Behavior Without This Pull Request :coffin:
    
    #### Behavior With This Pull Request :tada:
    
    #### Related Unit Tests
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [x] This patch was not authored or co-authored using [Generative 
Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6646 from turboFei/npe.
    
    Closes #6646
    
    42c1b7512 [Wang, Fei] common
    42b305557 [Wang, Fei] fix
    e0f58d234 [Wang, Fei] fix
    84492f21a [Wang, Fei] move code
    530eb21c1 [Wang, Fei] fix
    bc206ab25 [Wang, Fei] unused
    dd8908861 [Wang, Fei] prevent NPE
    4e6d39ecf [Wang, Fei] npe
    
    Authored-by: Wang, Fei <[email protected]>
    Signed-off-by: Wang, Fei <[email protected]>
---
 .../main/scala/org/apache/kyuubi/engine/EngineRef.scala    |  5 ++---
 .../org/apache/kyuubi/server/KyuubiBatchService.scala      |  8 ++------
 .../apache/kyuubi/server/KyuubiRestFrontendService.scala   |  8 ++++++++
 .../main/scala/org/apache/kyuubi/server/KyuubiServer.scala | 14 ++------------
 .../src/main/scala/org/apache/kyuubi/server/api/api.scala  |  7 +------
 .../org/apache/kyuubi/server/api/v1/AdminResource.scala    |  2 +-
 .../org/apache/kyuubi/server/api/v1/BatchesResource.scala  | 10 ++++------
 .../org/apache/kyuubi/session/KyuubiSessionManager.scala   |  4 ++--
 .../org/apache/kyuubi/server/TempFileServiceSuite.scala    |  4 +++-
 .../apache/kyuubi/server/api/v1/BatchesResourceSuite.scala |  4 ++--
 10 files changed, 27 insertions(+), 39 deletions(-)

diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 30bf32c39a..b7985fcf53 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -43,7 +43,6 @@ import 
org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, ENGINE_TIMEOUT,
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.plugin.GroupProvider
-import org.apache.kyuubi.server.KyuubiServer
 
 /**
  * The description and functionality of an engine at server side
@@ -72,8 +71,8 @@ private[kyuubi] class EngineRef(
   private val engineType: EngineType = 
EngineType.withName(conf.get(ENGINE_TYPE))
 
   // Server-side engine pool size threshold
-  private val poolThreshold: Int = 
Option(KyuubiServer.kyuubiServer).map(_.getConf)
-    .getOrElse(KyuubiConf()).get(ENGINE_POOL_SIZE_THRESHOLD)
+  private val poolThreshold: Int =
+    
Option(engineManager).map(_.getConf).getOrElse(KyuubiConf()).get(ENGINE_POOL_SIZE_THRESHOLD)
 
   private val clientPoolSize: Int = conf.get(ENGINE_POOL_SIZE)
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
index c099f2cb9c..38bb999c34 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
@@ -23,19 +23,15 @@ import 
org.apache.kyuubi.config.KyuubiConf.BATCH_SUBMITTER_THREADS
 import org.apache.kyuubi.engine.ApplicationState
 import org.apache.kyuubi.operation.OperationState
 import org.apache.kyuubi.server.metadata.MetadataManager
-import org.apache.kyuubi.service.{AbstractService, Serverable}
+import org.apache.kyuubi.service.AbstractService
 import org.apache.kyuubi.session.KyuubiSessionManager
 import org.apache.kyuubi.util.ThreadUtils
 
 class KyuubiBatchService(
-    server: Serverable,
+    restFrontend: KyuubiRestFrontendService,
     sessionManager: KyuubiSessionManager)
   extends AbstractService(classOf[KyuubiBatchService].getSimpleName) {
 
-  private lazy val restFrontend = server.frontendServices
-    .filter(_.isInstanceOf[KyuubiRestFrontendService])
-    .head.asInstanceOf[KyuubiRestFrontendService]
-
   private def kyuubiInstance: String = restFrontend.connectionUrl
 
   // TODO expose metrics, including pending/running/succeeded/failed batches
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
index ca2bbbe024..706bea8ab3 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
@@ -57,6 +57,13 @@ class KyuubiRestFrontendService(override val serverable: 
Serverable)
 
   private val batchChecker = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor("batch-checker")
 
+  private[kyuubi] lazy val batchService: Option[KyuubiBatchService] =
+    if (conf.get(BATCH_SUBMITTER_ENABLED)) {
+      Some(new KyuubiBatchService(this, sessionManager))
+    } else {
+      None
+    }
+
   lazy val host: String = conf.get(FRONTEND_REST_BIND_HOST)
     .getOrElse {
       if (JavaUtils.isWindows || JavaUtils.isMac) {
@@ -92,6 +99,7 @@ class KyuubiRestFrontendService(override val serverable: 
Serverable)
       conf.get(FRONTEND_REST_MAX_WORKER_THREADS),
       conf.get(FRONTEND_REST_JETTY_STOP_TIMEOUT),
       conf.get(FRONTEND_JETTY_SEND_VERSION_ENABLED))
+    batchService.foreach(addService)
     super.initialize(conf)
   }
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
index f8c9ffa1b4..338ac6b414 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
@@ -25,14 +25,14 @@ import org.apache.hadoop.security.UserGroupInformation
 
 import org.apache.kyuubi._
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.{BATCH_SUBMITTER_ENABLED, 
FRONTEND_PROTOCOLS, FrontendProtocols, KYUUBI_KUBERNETES_CONF_PREFIX}
+import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_PROTOCOLS, 
FrontendProtocols, KYUUBI_KUBERNETES_CONF_PREFIX}
 import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols._
 import org.apache.kyuubi.events.{EventBus, KyuubiServerInfoEvent, 
ServerEventHandlerRegister}
 import org.apache.kyuubi.ha.HighAvailabilityConf._
 import org.apache.kyuubi.ha.client.{AuthTypes, ServiceDiscovery}
 import org.apache.kyuubi.metrics.{MetricsConf, MetricsSystem}
 import org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStoreConf
-import org.apache.kyuubi.service.{AbstractBackendService, 
AbstractFrontendService, Serverable, ServiceState, TempFileService}
+import org.apache.kyuubi.service.{AbstractBackendService, 
AbstractFrontendService, Serverable, ServiceState}
 import org.apache.kyuubi.session.KyuubiSessionManager
 import org.apache.kyuubi.util.{KyuubiHadoopUtils, SignalRegister}
 import org.apache.kyuubi.zookeeper.EmbeddedZookeeper
@@ -202,8 +202,6 @@ class KyuubiServer(name: String) extends Serverable(name) {
         throw new UnsupportedOperationException(s"Frontend protocol $other is 
not supported yet.")
     }
 
-  final var tempFileService: TempFileService = _
-
   override def initialize(conf: KyuubiConf): Unit = synchronized {
     val kinit = new KinitAuxiliaryService()
     addService(kinit)
@@ -211,18 +209,10 @@ class KyuubiServer(name: String) extends Serverable(name) 
{
     val periodicGCService = new PeriodicGCService
     addService(periodicGCService)
 
-    tempFileService = new TempFileService
-    addService(tempFileService)
-
     if (conf.get(MetricsConf.METRICS_ENABLED)) {
       addService(new MetricsSystem)
     }
 
-    if (conf.isRESTEnabled && conf.get(BATCH_SUBMITTER_ENABLED)) {
-      addService(new KyuubiBatchService(
-        this,
-        backendService.sessionManager.asInstanceOf[KyuubiSessionManager]))
-    }
     super.initialize(conf)
 
     initLoggerEventHandler(conf)
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/api.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/api.scala
index 93953a577d..b561315420 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/api.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/api.scala
@@ -26,7 +26,7 @@ import javax.ws.rs.ext.{ExceptionMapper, Provider}
 import org.eclipse.jetty.server.handler.ContextHandler
 
 import org.apache.kyuubi.Logging
-import org.apache.kyuubi.server.{KyuubiBatchService, 
KyuubiRestFrontendService, KyuubiServer}
+import org.apache.kyuubi.server.KyuubiRestFrontendService
 
 private[api] trait ApiRequestContext {
 
@@ -36,11 +36,6 @@ private[api] trait ApiRequestContext {
   @Context
   protected var httpRequest: HttpServletRequest = _
 
-  protected lazy val batchService: Option[KyuubiBatchService] =
-    KyuubiServer.kyuubiServer.getServices
-      .find(_.isInstanceOf[KyuubiBatchService])
-      .map(_.asInstanceOf[KyuubiBatchService])
-
   final protected def fe: KyuubiRestFrontendService = 
FrontendServiceContext.get(servletContext)
 }
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
index 0f1544fe68..4edae8e7ab 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
@@ -461,7 +461,7 @@ private[v1] class AdminResource extends ApiRequestContext 
with Logging {
       throw new NotAllowedException(
         s"$userName is not allowed to count the batches")
     }
-    val batchCount = batchService
+    val batchCount = fe.batchService
       .map(_.countBatch(batchType, Option(batchUser), Option(batchState)))
       .getOrElse(0)
     new Count(batchCount)
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index de69cf3771..d370ee2cb9 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -43,8 +43,6 @@ import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.config.KyuubiReservedKeys._
 import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationManagerInfo, 
ApplicationState, KillResponse, KyuubiApplicationManager}
 import org.apache.kyuubi.operation.{BatchJobSubmission, FetchOrientation, 
OperationState}
-import org.apache.kyuubi.server.KyuubiServer
-import org.apache.kyuubi.server.KyuubiServer.kyuubiServer
 import org.apache.kyuubi.server.api.ApiRequestContext
 import org.apache.kyuubi.server.api.v1.BatchesResource._
 import org.apache.kyuubi.server.metadata.MetadataManager
@@ -68,7 +66,7 @@ private[v1] class BatchesResource extends ApiRequestContext 
with Logging {
     fe.getConf.get(ENGINE_SECURITY_ENABLED)
 
   private def batchV2Enabled(reqConf: Map[String, String]): Boolean = {
-    KyuubiServer.kyuubiServer.getConf.get(BATCH_SUBMITTER_ENABLED) &&
+    fe.getConf.get(BATCH_SUBMITTER_ENABLED) &&
     reqConf.getOrElse(BATCH_IMPL_VERSION.key, 
fe.getConf.get(BATCH_IMPL_VERSION)) == "2"
   }
 
@@ -511,7 +509,7 @@ private[v1] class BatchesResource extends ApiRequestContext 
with Logging {
         } else if (batchV2Enabled(metadata.requestConf) && metadata.state == 
"INITIALIZED" &&
           // there is a chance that metadata is outdated, then 
`cancelUnscheduledBatch` fails
           // and returns false
-          batchService.get.cancelUnscheduledBatch(batchId)) {
+          fe.batchService.get.cancelUnscheduledBatch(batchId)) {
           new CloseBatchResponse(true, s"Unscheduled batch $batchId is 
canceled.")
         } else if (batchV2Enabled(metadata.requestConf) && 
metadata.kyuubiInstance == null) {
           // code goes here indicates metadata is outdated, recursively calls 
itself to refresh
@@ -569,7 +567,7 @@ private[v1] class BatchesResource extends ApiRequestContext 
with Logging {
       uploadFileFolderPath: JPath): Unit = {
     try {
       val tempFile = Utils.writeToTempFile(inputStream, uploadFileFolderPath, 
fileName)
-      kyuubiServer.tempFileService.addPathToExpiration(tempFile.toPath)
+      fe.sessionManager.tempFileService.addPathToExpiration(tempFile.toPath)
       request.setResource(tempFile.getPath)
     } catch {
       case e: Exception =>
@@ -605,7 +603,7 @@ private[v1] class BatchesResource extends ApiRequestContext 
with Logging {
                   filePart.getValueAs(classOf[InputStream]),
                   uploadFileFolderPath,
                   fileName)
-                
kyuubiServer.tempFileService.addPathToExpiration(tempFile.toPath)
+                
fe.sessionManager.tempFileService.addPathToExpiration(tempFile.toPath)
                 tempFile.getPath
               } catch {
                 case e: Exception =>
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index a20b4bc97c..3c163520f9 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -36,7 +36,6 @@ import org.apache.kyuubi.metrics.MetricsConstants._
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.{KyuubiOperationManager, OperationState}
 import org.apache.kyuubi.plugin.{GroupProvider, PluginLoader, 
SessionConfAdvisor}
-import org.apache.kyuubi.server.KyuubiServer.kyuubiServer
 import org.apache.kyuubi.server.metadata.{MetadataManager, 
MetadataRequestsRetryRef}
 import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
 import org.apache.kyuubi.service.TempFileService
@@ -73,12 +72,13 @@ class KyuubiSessionManager private (name: String) extends 
SessionManager(name) {
   private val engineConnectionAliveChecker =
     
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-engine-alive-checker")
 
-  def tempFileService: TempFileService = kyuubiServer.tempFileService
+  val tempFileService = new TempFileService()
 
   override def initialize(conf: KyuubiConf): Unit = {
     this.conf = conf
     addService(applicationManager)
     addService(credentialsManager)
+    addService(tempFileService)
     metadataManager.foreach(addService)
     initSessionLimiter(conf)
     initEngineStartupProcessSemaphore(conf)
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/TempFileServiceSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/TempFileServiceSuite.scala
index 4b7568c1c7..656aef5adb 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/TempFileServiceSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/TempFileServiceSuite.scala
@@ -27,6 +27,7 @@ import org.apache.kyuubi.{Utils, WithKyuubiServer}
 import org.apache.kyuubi.Utils.writeToTempFile
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.config.KyuubiConf.SERVER_TEMP_FILE_EXPIRE_TIME
+import org.apache.kyuubi.session.KyuubiSessionManager
 
 class TempFileServiceSuite extends WithKyuubiServer {
   private val expirationInMs = 100
@@ -35,7 +36,8 @@ class TempFileServiceSuite extends WithKyuubiServer {
     .set(SERVER_TEMP_FILE_EXPIRE_TIME, 
Duration.ofMillis(expirationInMs).toMillis)
 
   test("file cleaned up after expiration") {
-    val tempFileService = KyuubiServer.kyuubiServer.tempFileService
+    val tempFileService =
+      
server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager].tempFileService
     (0 until 3).map { i =>
       val dir = Utils.createTempDir()
       writeToTempFile(new ByteArrayInputStream(s"$i".getBytes()), dir, 
s"$i.txt")
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index ac287e9060..dcddc15e67 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -42,7 +42,7 @@ import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
 import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
 import org.apache.kyuubi.operation.{BatchJobSubmission, OperationState}
 import org.apache.kyuubi.operation.OperationState.OperationState
-import org.apache.kyuubi.server.{KyuubiBatchService, KyuubiRestFrontendService}
+import org.apache.kyuubi.server.KyuubiRestFrontendService
 import 
org.apache.kyuubi.server.http.util.HttpAuthUtils.{basicAuthorizationHeader, 
AUTHORIZATION_HEADER}
 import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
 import 
org.apache.kyuubi.service.authentication.{AnonymousAuthenticationProviderImpl, 
AuthUtils}
@@ -64,7 +64,7 @@ class BatchesV2ResourceSuite extends BatchesResourceSuiteBase 
{
 
   override def afterEach(): Unit = {
     val sessionManager = 
fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
-    val batchService = server.getServices.collectFirst { case b: 
KyuubiBatchService => b }.get
+    val batchService = 
fe.asInstanceOf[KyuubiRestFrontendService].batchService.get
     sessionManager.getBatchesFromMetadataStore(MetadataFilter(), 0, 
Int.MaxValue)
       .foreach { batch => batchService.cancelUnscheduledBatch(batch.getId) }
     super.afterEach()

Reply via email to