This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 4c71415f99 [KYUUBI #6646] Fix RESTful API NPE when restarting Kyuubi
server
4c71415f99 is described below
commit 4c71415f99653b41f15e4f8a37ae31e096e84558
Author: Wang, Fei <[email protected]>
AuthorDate: Thu Aug 29 14:21:40 2024 -0700
[KYUUBI #6646] Fix RESTful API NPE when restarting Kyuubi server
# :mag: Description
## Issue References ๐
We meet issue when restarting the kyuubi server:
```
2024-08-28 09:48:41.507 WARN
org.apache.kyuubi.server.api.RestExceptionMapper: Error occurs on accessing
REST API.
java.lang.NullPointerException: Cannot invoke
"org.apache.kyuubi.server.KyuubiServer.getConf()" because the return value of
"org.apache.kyuubi.server.KyuubiServer$.kyuubiServer()" is null
at
org.apache.kyuubi.server.api.v1.BatchesResource.batchV2Enabled(BatchesResource.scala:72)
~[kyuubi-server_2.12-1.10.0.0.1.0.jar:1.10.0.0.1.0]
at
org.apache.kyuubi.server.api.v1.BatchesResource.$anonfun$batchInfo$3(BatchesResource.scala:345)
~[kyuubi-server_2.12-1.10.0.0.1.0.jar:1.10.0.0.1.0]
at scala.Option.map(Option.scala:230) ~[scala-library-2.12.18.jar:?]
```
The root cause is that, the `KyuubiServer.kyuubiServer` is null until the
kyuubi server start finished.
https://github.com/apache/kyuubi/blob/db57e9365d7933942197936ac7ff711d58f7ea91/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala#L231-L234
But the RESTful api initialization finished before than that, so, NPE will
be thrown between REST front started to all the kyuubi service started.
## Describe Your Solution ๐ง
Please include a summary of the change and which issue is fixed. Please
also include relevant motivation and context. List any dependencies that are
required for this change.
In this PR,
1. move the KyuubiBatchService into KyuubiRestFrontendService
2. move the TempFileService (introduced in #6587) into KyuubiSessionManager
<img width="872" alt="image"
src="https://github.com/user-attachments/assets/a4c5fe38-1d34-4fee-933f-72511bc06f27">
## Types of changes :bookmark:
- [x] Bugfix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
## Test Plan ๐งช
#### Behavior Without This Pull Request :coffin:
#### Behavior With This Pull Request :tada:
#### Related Unit Tests
---
# Checklist ๐
- [x] This patch was not authored or co-authored using [Generative
Tooling](https://www.apache.org/legal/generative-tooling.html)
**Be nice. Be informative.**
Closes #6646 from turboFei/npe.
Closes #6646
42c1b7512 [Wang, Fei] common
42b305557 [Wang, Fei] fix
e0f58d234 [Wang, Fei] fix
84492f21a [Wang, Fei] move code
530eb21c1 [Wang, Fei] fix
bc206ab25 [Wang, Fei] unused
dd8908861 [Wang, Fei] prevent NPE
4e6d39ecf [Wang, Fei] npe
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
---
.../main/scala/org/apache/kyuubi/engine/EngineRef.scala | 5 ++---
.../org/apache/kyuubi/server/KyuubiBatchService.scala | 8 ++------
.../apache/kyuubi/server/KyuubiRestFrontendService.scala | 8 ++++++++
.../main/scala/org/apache/kyuubi/server/KyuubiServer.scala | 14 ++------------
.../src/main/scala/org/apache/kyuubi/server/api/api.scala | 7 +------
.../org/apache/kyuubi/server/api/v1/AdminResource.scala | 2 +-
.../org/apache/kyuubi/server/api/v1/BatchesResource.scala | 10 ++++------
.../org/apache/kyuubi/session/KyuubiSessionManager.scala | 4 ++--
.../org/apache/kyuubi/server/TempFileServiceSuite.scala | 4 +++-
.../apache/kyuubi/server/api/v1/BatchesResourceSuite.scala | 4 ++--
10 files changed, 27 insertions(+), 39 deletions(-)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index 30bf32c39a..b7985fcf53 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -43,7 +43,6 @@ import
org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, ENGINE_TIMEOUT,
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.plugin.GroupProvider
-import org.apache.kyuubi.server.KyuubiServer
/**
* The description and functionality of an engine at server side
@@ -72,8 +71,8 @@ private[kyuubi] class EngineRef(
private val engineType: EngineType =
EngineType.withName(conf.get(ENGINE_TYPE))
// Server-side engine pool size threshold
- private val poolThreshold: Int =
Option(KyuubiServer.kyuubiServer).map(_.getConf)
- .getOrElse(KyuubiConf()).get(ENGINE_POOL_SIZE_THRESHOLD)
+ private val poolThreshold: Int =
+
Option(engineManager).map(_.getConf).getOrElse(KyuubiConf()).get(ENGINE_POOL_SIZE_THRESHOLD)
private val clientPoolSize: Int = conf.get(ENGINE_POOL_SIZE)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
index c099f2cb9c..38bb999c34 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiBatchService.scala
@@ -23,19 +23,15 @@ import
org.apache.kyuubi.config.KyuubiConf.BATCH_SUBMITTER_THREADS
import org.apache.kyuubi.engine.ApplicationState
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.server.metadata.MetadataManager
-import org.apache.kyuubi.service.{AbstractService, Serverable}
+import org.apache.kyuubi.service.AbstractService
import org.apache.kyuubi.session.KyuubiSessionManager
import org.apache.kyuubi.util.ThreadUtils
class KyuubiBatchService(
- server: Serverable,
+ restFrontend: KyuubiRestFrontendService,
sessionManager: KyuubiSessionManager)
extends AbstractService(classOf[KyuubiBatchService].getSimpleName) {
- private lazy val restFrontend = server.frontendServices
- .filter(_.isInstanceOf[KyuubiRestFrontendService])
- .head.asInstanceOf[KyuubiRestFrontendService]
-
private def kyuubiInstance: String = restFrontend.connectionUrl
// TODO expose metrics, including pending/running/succeeded/failed batches
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
index ca2bbbe024..706bea8ab3 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiRestFrontendService.scala
@@ -57,6 +57,13 @@ class KyuubiRestFrontendService(override val serverable:
Serverable)
private val batchChecker =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("batch-checker")
+ private[kyuubi] lazy val batchService: Option[KyuubiBatchService] =
+ if (conf.get(BATCH_SUBMITTER_ENABLED)) {
+ Some(new KyuubiBatchService(this, sessionManager))
+ } else {
+ None
+ }
+
lazy val host: String = conf.get(FRONTEND_REST_BIND_HOST)
.getOrElse {
if (JavaUtils.isWindows || JavaUtils.isMac) {
@@ -92,6 +99,7 @@ class KyuubiRestFrontendService(override val serverable:
Serverable)
conf.get(FRONTEND_REST_MAX_WORKER_THREADS),
conf.get(FRONTEND_REST_JETTY_STOP_TIMEOUT),
conf.get(FRONTEND_JETTY_SEND_VERSION_ENABLED))
+ batchService.foreach(addService)
super.initialize(conf)
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
index f8c9ffa1b4..338ac6b414 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiServer.scala
@@ -25,14 +25,14 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.kyuubi._
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.config.KyuubiConf.{BATCH_SUBMITTER_ENABLED,
FRONTEND_PROTOCOLS, FrontendProtocols, KYUUBI_KUBERNETES_CONF_PREFIX}
+import org.apache.kyuubi.config.KyuubiConf.{FRONTEND_PROTOCOLS,
FrontendProtocols, KYUUBI_KUBERNETES_CONF_PREFIX}
import org.apache.kyuubi.config.KyuubiConf.FrontendProtocols._
import org.apache.kyuubi.events.{EventBus, KyuubiServerInfoEvent,
ServerEventHandlerRegister}
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.{AuthTypes, ServiceDiscovery}
import org.apache.kyuubi.metrics.{MetricsConf, MetricsSystem}
import org.apache.kyuubi.server.metadata.jdbc.JDBCMetadataStoreConf
-import org.apache.kyuubi.service.{AbstractBackendService,
AbstractFrontendService, Serverable, ServiceState, TempFileService}
+import org.apache.kyuubi.service.{AbstractBackendService,
AbstractFrontendService, Serverable, ServiceState}
import org.apache.kyuubi.session.KyuubiSessionManager
import org.apache.kyuubi.util.{KyuubiHadoopUtils, SignalRegister}
import org.apache.kyuubi.zookeeper.EmbeddedZookeeper
@@ -202,8 +202,6 @@ class KyuubiServer(name: String) extends Serverable(name) {
throw new UnsupportedOperationException(s"Frontend protocol $other is
not supported yet.")
}
- final var tempFileService: TempFileService = _
-
override def initialize(conf: KyuubiConf): Unit = synchronized {
val kinit = new KinitAuxiliaryService()
addService(kinit)
@@ -211,18 +209,10 @@ class KyuubiServer(name: String) extends Serverable(name)
{
val periodicGCService = new PeriodicGCService
addService(periodicGCService)
- tempFileService = new TempFileService
- addService(tempFileService)
-
if (conf.get(MetricsConf.METRICS_ENABLED)) {
addService(new MetricsSystem)
}
- if (conf.isRESTEnabled && conf.get(BATCH_SUBMITTER_ENABLED)) {
- addService(new KyuubiBatchService(
- this,
- backendService.sessionManager.asInstanceOf[KyuubiSessionManager]))
- }
super.initialize(conf)
initLoggerEventHandler(conf)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/api.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/api.scala
index 93953a577d..b561315420 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/api.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/api.scala
@@ -26,7 +26,7 @@ import javax.ws.rs.ext.{ExceptionMapper, Provider}
import org.eclipse.jetty.server.handler.ContextHandler
import org.apache.kyuubi.Logging
-import org.apache.kyuubi.server.{KyuubiBatchService,
KyuubiRestFrontendService, KyuubiServer}
+import org.apache.kyuubi.server.KyuubiRestFrontendService
private[api] trait ApiRequestContext {
@@ -36,11 +36,6 @@ private[api] trait ApiRequestContext {
@Context
protected var httpRequest: HttpServletRequest = _
- protected lazy val batchService: Option[KyuubiBatchService] =
- KyuubiServer.kyuubiServer.getServices
- .find(_.isInstanceOf[KyuubiBatchService])
- .map(_.asInstanceOf[KyuubiBatchService])
-
final protected def fe: KyuubiRestFrontendService =
FrontendServiceContext.get(servletContext)
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
index 0f1544fe68..4edae8e7ab 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/AdminResource.scala
@@ -461,7 +461,7 @@ private[v1] class AdminResource extends ApiRequestContext
with Logging {
throw new NotAllowedException(
s"$userName is not allowed to count the batches")
}
- val batchCount = batchService
+ val batchCount = fe.batchService
.map(_.countBatch(batchType, Option(batchUser), Option(batchState)))
.getOrElse(0)
new Count(batchCount)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
index de69cf3771..d370ee2cb9 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala
@@ -43,8 +43,6 @@ import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys._
import org.apache.kyuubi.engine.{ApplicationInfo, ApplicationManagerInfo,
ApplicationState, KillResponse, KyuubiApplicationManager}
import org.apache.kyuubi.operation.{BatchJobSubmission, FetchOrientation,
OperationState}
-import org.apache.kyuubi.server.KyuubiServer
-import org.apache.kyuubi.server.KyuubiServer.kyuubiServer
import org.apache.kyuubi.server.api.ApiRequestContext
import org.apache.kyuubi.server.api.v1.BatchesResource._
import org.apache.kyuubi.server.metadata.MetadataManager
@@ -68,7 +66,7 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
fe.getConf.get(ENGINE_SECURITY_ENABLED)
private def batchV2Enabled(reqConf: Map[String, String]): Boolean = {
- KyuubiServer.kyuubiServer.getConf.get(BATCH_SUBMITTER_ENABLED) &&
+ fe.getConf.get(BATCH_SUBMITTER_ENABLED) &&
reqConf.getOrElse(BATCH_IMPL_VERSION.key,
fe.getConf.get(BATCH_IMPL_VERSION)) == "2"
}
@@ -511,7 +509,7 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
} else if (batchV2Enabled(metadata.requestConf) && metadata.state ==
"INITIALIZED" &&
// there is a chance that metadata is outdated, then
`cancelUnscheduledBatch` fails
// and returns false
- batchService.get.cancelUnscheduledBatch(batchId)) {
+ fe.batchService.get.cancelUnscheduledBatch(batchId)) {
new CloseBatchResponse(true, s"Unscheduled batch $batchId is
canceled.")
} else if (batchV2Enabled(metadata.requestConf) &&
metadata.kyuubiInstance == null) {
// code goes here indicates metadata is outdated, recursively calls
itself to refresh
@@ -569,7 +567,7 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
uploadFileFolderPath: JPath): Unit = {
try {
val tempFile = Utils.writeToTempFile(inputStream, uploadFileFolderPath,
fileName)
- kyuubiServer.tempFileService.addPathToExpiration(tempFile.toPath)
+ fe.sessionManager.tempFileService.addPathToExpiration(tempFile.toPath)
request.setResource(tempFile.getPath)
} catch {
case e: Exception =>
@@ -605,7 +603,7 @@ private[v1] class BatchesResource extends ApiRequestContext
with Logging {
filePart.getValueAs(classOf[InputStream]),
uploadFileFolderPath,
fileName)
-
kyuubiServer.tempFileService.addPathToExpiration(tempFile.toPath)
+
fe.sessionManager.tempFileService.addPathToExpiration(tempFile.toPath)
tempFile.getPath
} catch {
case e: Exception =>
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
index a20b4bc97c..3c163520f9 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionManager.scala
@@ -36,7 +36,6 @@ import org.apache.kyuubi.metrics.MetricsConstants._
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.{KyuubiOperationManager, OperationState}
import org.apache.kyuubi.plugin.{GroupProvider, PluginLoader,
SessionConfAdvisor}
-import org.apache.kyuubi.server.KyuubiServer.kyuubiServer
import org.apache.kyuubi.server.metadata.{MetadataManager,
MetadataRequestsRetryRef}
import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
import org.apache.kyuubi.service.TempFileService
@@ -73,12 +72,13 @@ class KyuubiSessionManager private (name: String) extends
SessionManager(name) {
private val engineConnectionAliveChecker =
ThreadUtils.newDaemonSingleThreadScheduledExecutor(s"$name-engine-alive-checker")
- def tempFileService: TempFileService = kyuubiServer.tempFileService
+ val tempFileService = new TempFileService()
override def initialize(conf: KyuubiConf): Unit = {
this.conf = conf
addService(applicationManager)
addService(credentialsManager)
+ addService(tempFileService)
metadataManager.foreach(addService)
initSessionLimiter(conf)
initEngineStartupProcessSemaphore(conf)
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/TempFileServiceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/TempFileServiceSuite.scala
index 4b7568c1c7..656aef5adb 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/TempFileServiceSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/TempFileServiceSuite.scala
@@ -27,6 +27,7 @@ import org.apache.kyuubi.{Utils, WithKyuubiServer}
import org.apache.kyuubi.Utils.writeToTempFile
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf.SERVER_TEMP_FILE_EXPIRE_TIME
+import org.apache.kyuubi.session.KyuubiSessionManager
class TempFileServiceSuite extends WithKyuubiServer {
private val expirationInMs = 100
@@ -35,7 +36,8 @@ class TempFileServiceSuite extends WithKyuubiServer {
.set(SERVER_TEMP_FILE_EXPIRE_TIME,
Duration.ofMillis(expirationInMs).toMillis)
test("file cleaned up after expiration") {
- val tempFileService = KyuubiServer.kyuubiServer.tempFileService
+ val tempFileService =
+
server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager].tempFileService
(0 until 3).map { i =>
val dir = Utils.createTempDir()
writeToTempFile(new ByteArrayInputStream(s"$i".getBytes()), dir,
s"$i.txt")
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
index ac287e9060..dcddc15e67 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/BatchesResourceSuite.scala
@@ -42,7 +42,7 @@ import org.apache.kyuubi.engine.spark.SparkBatchProcessBuilder
import org.apache.kyuubi.metrics.{MetricsConstants, MetricsSystem}
import org.apache.kyuubi.operation.{BatchJobSubmission, OperationState}
import org.apache.kyuubi.operation.OperationState.OperationState
-import org.apache.kyuubi.server.{KyuubiBatchService, KyuubiRestFrontendService}
+import org.apache.kyuubi.server.KyuubiRestFrontendService
import
org.apache.kyuubi.server.http.util.HttpAuthUtils.{basicAuthorizationHeader,
AUTHORIZATION_HEADER}
import org.apache.kyuubi.server.metadata.api.{Metadata, MetadataFilter}
import
org.apache.kyuubi.service.authentication.{AnonymousAuthenticationProviderImpl,
AuthUtils}
@@ -64,7 +64,7 @@ class BatchesV2ResourceSuite extends BatchesResourceSuiteBase
{
override def afterEach(): Unit = {
val sessionManager =
fe.be.sessionManager.asInstanceOf[KyuubiSessionManager]
- val batchService = server.getServices.collectFirst { case b:
KyuubiBatchService => b }.get
+ val batchService =
fe.asInstanceOf[KyuubiRestFrontendService].batchService.get
sessionManager.getBatchesFromMetadataStore(MetadataFilter(), 0,
Int.MaxValue)
.foreach { batch => batchService.cancelUnscheduledBatch(batch.getId) }
super.afterEach()